misc: Fix python scripts shebang line
[vpp.git] / src / plugins / pppoe / test / test_pppoe.py
1 #!/usr/bin/env python3
2
3 import socket
4 import unittest
5
6 from scapy.packet import Raw
7 from scapy.layers.l2 import Ether
8 from scapy.layers.ppp import PPPoE, PPPoED, PPP
9 from scapy.layers.inet import IP
10
11 from framework import VppTestCase, VppTestRunner
12 from vpp_ip_route import VppIpRoute, VppRoutePath
13 from vpp_pppoe_interface import VppPppoeInterface
14 from util import ppp, ppc
15
16
17 class TestPPPoE(VppTestCase):
18     """ PPPoE Test Case """
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestPPPoE, cls).setUpClass()
23
24         cls.session_id = 1
25         cls.dst_ip = "100.1.1.100"
26         cls.dst_ipn = socket.inet_pton(socket.AF_INET, cls.dst_ip)
27
28     @classmethod
29     def tearDownClass(cls):
30         super(TestPPPoE, cls).tearDownClass()
31
32     def setUp(self):
33         super(TestPPPoE, self).setUp()
34
35         # create 2 pg interfaces
36         self.create_pg_interfaces(range(3))
37
38         for i in self.pg_interfaces:
39             i.admin_up()
40             i.config_ip4()
41             i.resolve_arp()
42
43     def tearDown(self):
44         super(TestPPPoE, self).tearDown()
45
46         for i in self.pg_interfaces:
47             i.unconfig_ip4()
48             i.admin_down()
49
50     def show_commands_at_teardown(self):
51         self.logger.info(self.vapi.cli("show int"))
52         self.logger.info(self.vapi.cli("show pppoe fib"))
53         self.logger.info(self.vapi.cli("show pppoe session"))
54         self.logger.info(self.vapi.cli("show ip fib"))
55         self.logger.info(self.vapi.cli("show trace"))
56
57     def create_stream_pppoe_discovery(self, src_if, dst_if,
58                                       client_mac, count=1):
59         packets = []
60         for i in range(count):
61             # create packet info stored in the test case instance
62             info = self.create_packet_info(src_if, dst_if)
63             # convert the info into packet payload
64             payload = self.info_to_payload(info)
65             # create the packet itself
66             p = (Ether(dst=src_if.local_mac, src=client_mac) /
67                  PPPoED(sessionid=0) /
68                  Raw(payload))
69             # store a copy of the packet in the packet info
70             info.data = p.copy()
71             # append the packet to the list
72             packets.append(p)
73
74         # return the created packet list
75         return packets
76
77     def create_stream_pppoe_lcp(self, src_if, dst_if,
78                                 client_mac, session_id, count=1):
79         packets = []
80         for i in range(count):
81             # create packet info stored in the test case instance
82             info = self.create_packet_info(src_if, dst_if)
83             # convert the info into packet payload
84             payload = self.info_to_payload(info)
85             # create the packet itself
86             p = (Ether(dst=src_if.local_mac, src=client_mac) /
87                  PPPoE(sessionid=session_id) /
88                  PPP(proto=0xc021) /
89                  Raw(payload))
90             # store a copy of the packet in the packet info
91             info.data = p.copy()
92             # append the packet to the list
93             packets.append(p)
94
95         # return the created packet list
96         return packets
97
98     def create_stream_pppoe_ip4(self, src_if, dst_if,
99                                 client_mac, session_id, client_ip, count=1):
100         packets = []
101         for i in range(count):
102             # create packet info stored in the test case instance
103             info = self.create_packet_info(src_if, dst_if)
104             # convert the info into packet payload
105             payload = self.info_to_payload(info)
106             # create the packet itself
107             p = (Ether(dst=src_if.local_mac, src=client_mac) /
108                  PPPoE(sessionid=session_id) /
109                  PPP(proto=0x0021) /
110                  IP(src=client_ip, dst=self.dst_ip) /
111                  Raw(payload))
112             # store a copy of the packet in the packet info
113             info.data = p.copy()
114             # append the packet to the list
115             packets.append(p)
116
117         # return the created packet list
118         return packets
119
120     def create_stream_ip4(self, src_if, dst_if, client_ip, dst_ip, count=1):
121         pkts = []
122         for i in range(count):
123             # create packet info stored in the test case instance
124             info = self.create_packet_info(src_if, dst_if)
125             # convert the info into packet payload
126             payload = self.info_to_payload(info)
127             # create the packet itself
128             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
129                  IP(src=dst_ip, dst=client_ip) /
130                  Raw(payload))
131             # store a copy of the packet in the packet info
132             info.data = p.copy()
133             # append the packet to the list
134             pkts.append(p)
135
136         # return the created packet list
137         return pkts
138
139     def verify_decapped_pppoe(self, src_if, capture, sent):
140         self.assertEqual(len(capture), len(sent))
141
142         for i in range(len(capture)):
143             try:
144                 tx = sent[i]
145                 rx = capture[i]
146
147                 tx_ip = tx[IP]
148                 rx_ip = rx[IP]
149
150                 self.assertEqual(rx_ip.src, tx_ip.src)
151                 self.assertEqual(rx_ip.dst, tx_ip.dst)
152
153             except:
154                 self.logger.error(ppp("Rx:", rx))
155                 self.logger.error(ppp("Tx:", tx))
156                 raise
157
158     def verify_encaped_pppoe(self, src_if, capture, sent, session_id):
159
160         self.assertEqual(len(capture), len(sent))
161
162         for i in range(len(capture)):
163             try:
164                 tx = sent[i]
165                 rx = capture[i]
166
167                 tx_ip = tx[IP]
168                 rx_ip = rx[IP]
169
170                 self.assertEqual(rx_ip.src, tx_ip.src)
171                 self.assertEqual(rx_ip.dst, tx_ip.dst)
172
173                 rx_pppoe = rx[PPPoE]
174
175                 self.assertEqual(rx_pppoe.sessionid, session_id)
176
177             except:
178                 self.logger.error(ppp("Rx:", rx))
179                 self.logger.error(ppp("Tx:", tx))
180                 raise
181
182     def test_PPPoE_Decap(self):
183         """ PPPoE Decap Test """
184
185         self.vapi.cli("clear trace")
186
187         #
188         # Add a route that resolves the server's destination
189         #
190         route_sever_dst = VppIpRoute(self, "100.1.1.100", 32,
191                                      [VppRoutePath(self.pg1.remote_ip4,
192                                                    self.pg1.sw_if_index)])
193         route_sever_dst.add_vpp_config()
194
195         # Send PPPoE Discovery
196         tx0 = self.create_stream_pppoe_discovery(self.pg0, self.pg1,
197                                                  self.pg0.remote_mac)
198         self.pg0.add_stream(tx0)
199         self.pg_start()
200
201         # Send PPPoE PPP LCP
202         tx1 = self.create_stream_pppoe_lcp(self.pg0, self.pg1,
203                                            self.pg0.remote_mac,
204                                            self.session_id)
205         self.pg0.add_stream(tx1)
206         self.pg_start()
207
208         # Create PPPoE session
209         pppoe_if = VppPppoeInterface(self,
210                                      self.pg0.remote_ip4,
211                                      self.pg0.remote_mac,
212                                      self.session_id)
213         pppoe_if.add_vpp_config()
214
215         #
216         # Send tunneled packets that match the created tunnel and
217         # are decapped and forwarded
218         #
219         tx2 = self.create_stream_pppoe_ip4(self.pg0, self.pg1,
220                                            self.pg0.remote_mac,
221                                            self.session_id,
222                                            self.pg0.remote_ip4)
223         self.pg0.add_stream(tx2)
224
225         self.pg_enable_capture(self.pg_interfaces)
226         self.pg_start()
227
228         rx2 = self.pg1.get_capture(len(tx2))
229         self.verify_decapped_pppoe(self.pg0, rx2, tx2)
230
231         self.logger.info(self.vapi.cli("show pppoe fib"))
232         self.logger.info(self.vapi.cli("show pppoe session"))
233         self.logger.info(self.vapi.cli("show ip fib"))
234
235         #
236         # test case cleanup
237         #
238
239         # Delete PPPoE session
240         pppoe_if.remove_vpp_config()
241
242         # Delete a route that resolves the server's destination
243         route_sever_dst.remove_vpp_config()
244
245     def test_PPPoE_Encap(self):
246         """ PPPoE Encap Test """
247
248         self.vapi.cli("clear trace")
249
250         #
251         # Add a route that resolves the server's destination
252         #
253         route_sever_dst = VppIpRoute(self, "100.1.1.100", 32,
254                                      [VppRoutePath(self.pg1.remote_ip4,
255                                                    self.pg1.sw_if_index)])
256         route_sever_dst.add_vpp_config()
257
258         # Send PPPoE Discovery
259         tx0 = self.create_stream_pppoe_discovery(self.pg0, self.pg1,
260                                                  self.pg0.remote_mac)
261         self.pg0.add_stream(tx0)
262         self.pg_start()
263
264         # Send PPPoE PPP LCP
265         tx1 = self.create_stream_pppoe_lcp(self.pg0, self.pg1,
266                                            self.pg0.remote_mac,
267                                            self.session_id)
268         self.pg0.add_stream(tx1)
269         self.pg_start()
270
271         # Create PPPoE session
272         pppoe_if = VppPppoeInterface(self,
273                                      self.pg0.remote_ip4,
274                                      self.pg0.remote_mac,
275                                      self.session_id)
276         pppoe_if.add_vpp_config()
277
278         #
279         # Send a packet stream that is routed into the session
280         #  - packets are PPPoE encapped
281         #
282         self.vapi.cli("clear trace")
283         tx2 = self.create_stream_ip4(self.pg1, self.pg0,
284                                      self.pg0.remote_ip4, self.dst_ip, 65)
285         self.pg1.add_stream(tx2)
286
287         self.pg_enable_capture(self.pg_interfaces)
288         self.pg_start()
289
290         rx2 = self.pg0.get_capture(len(tx2))
291         self.verify_encaped_pppoe(self.pg1, rx2, tx2, self.session_id)
292
293         self.logger.info(self.vapi.cli("show pppoe fib"))
294         self.logger.info(self.vapi.cli("show pppoe session"))
295         self.logger.info(self.vapi.cli("show ip fib"))
296         self.logger.info(self.vapi.cli("show adj"))
297
298         #
299         # test case cleanup
300         #
301
302         # Delete PPPoE session
303         pppoe_if.remove_vpp_config()
304
305         # Delete a route that resolves the server's destination
306         route_sever_dst.remove_vpp_config()
307
308     def test_PPPoE_Add_Twice(self):
309         """ PPPoE Add Same Session Twice Test """
310
311         self.vapi.cli("clear trace")
312
313         #
314         # Add a route that resolves the server's destination
315         #
316         route_sever_dst = VppIpRoute(self, "100.1.1.100", 32,
317                                      [VppRoutePath(self.pg1.remote_ip4,
318                                                    self.pg1.sw_if_index)])
319         route_sever_dst.add_vpp_config()
320
321         # Send PPPoE Discovery
322         tx0 = self.create_stream_pppoe_discovery(self.pg0, self.pg1,
323                                                  self.pg0.remote_mac)
324         self.pg0.add_stream(tx0)
325         self.pg_start()
326
327         # Send PPPoE PPP LCP
328         tx1 = self.create_stream_pppoe_lcp(self.pg0, self.pg1,
329                                            self.pg0.remote_mac,
330                                            self.session_id)
331         self.pg0.add_stream(tx1)
332         self.pg_start()
333
334         # Create PPPoE session
335         pppoe_if = VppPppoeInterface(self,
336                                      self.pg0.remote_ip4,
337                                      self.pg0.remote_mac,
338                                      self.session_id)
339         pppoe_if.add_vpp_config()
340
341         #
342         # The double create (create the same session twice) should fail,
343         # and we should still be able to use the original
344         #
345         try:
346             pppoe_if.add_vpp_config()
347         except Exception:
348             pass
349         else:
350             self.fail("Double GRE tunnel add does not fail")
351
352         #
353         # test case cleanup
354         #
355
356         # Delete PPPoE session
357         pppoe_if.remove_vpp_config()
358
359         # Delete a route that resolves the server's destination
360         route_sever_dst.remove_vpp_config()
361
362     def test_PPPoE_Del_Twice(self):
363         """ PPPoE Delete Same Session Twice Test """
364
365         self.vapi.cli("clear trace")
366
367         #
368         # Add a route that resolves the server's destination
369         #
370         route_sever_dst = VppIpRoute(self, "100.1.1.100", 32,
371                                      [VppRoutePath(self.pg1.remote_ip4,
372                                                    self.pg1.sw_if_index)])
373         route_sever_dst.add_vpp_config()
374
375         # Send PPPoE Discovery
376         tx0 = self.create_stream_pppoe_discovery(self.pg0, self.pg1,
377                                                  self.pg0.remote_mac)
378         self.pg0.add_stream(tx0)
379         self.pg_start()
380
381         # Send PPPoE PPP LCP
382         tx1 = self.create_stream_pppoe_lcp(self.pg0, self.pg1,
383                                            self.pg0.remote_mac,
384                                            self.session_id)
385         self.pg0.add_stream(tx1)
386         self.pg_start()
387
388         # Create PPPoE session
389         pppoe_if = VppPppoeInterface(self,
390                                      self.pg0.remote_ip4,
391                                      self.pg0.remote_mac,
392                                      self.session_id)
393         pppoe_if.add_vpp_config()
394
395         # Delete PPPoE session
396         pppoe_if.remove_vpp_config()
397
398         #
399         # The double del (del the same session twice) should fail,
400         # and we should still be able to use the original
401         #
402         try:
403             pppoe_if.remove_vpp_config()
404         except Exception:
405             pass
406         else:
407             self.fail("Double GRE tunnel del does not fail")
408
409         #
410         # test case cleanup
411         #
412
413         # Delete a route that resolves the server's destination
414         route_sever_dst.remove_vpp_config()
415
416     def test_PPPoE_Decap_Multiple(self):
417         """ PPPoE Decap Multiple Sessions Test """
418
419         self.vapi.cli("clear trace")
420
421         #
422         # Add a route that resolves the server's destination
423         #
424         route_sever_dst = VppIpRoute(self, "100.1.1.100", 32,
425                                      [VppRoutePath(self.pg1.remote_ip4,
426                                                    self.pg1.sw_if_index)])
427         route_sever_dst.add_vpp_config()
428
429         # Send PPPoE Discovery 1
430         tx0 = self.create_stream_pppoe_discovery(self.pg0, self.pg1,
431                                                  self.pg0.remote_mac)
432         self.pg0.add_stream(tx0)
433         self.pg_start()
434
435         # Send PPPoE PPP LCP 1
436         tx1 = self.create_stream_pppoe_lcp(self.pg0, self.pg1,
437                                            self.pg0.remote_mac,
438                                            self.session_id)
439         self.pg0.add_stream(tx1)
440         self.pg_start()
441
442         # Create PPPoE session 1
443         pppoe_if1 = VppPppoeInterface(self,
444                                       self.pg0.remote_ip4,
445                                       self.pg0.remote_mac,
446                                       self.session_id)
447         pppoe_if1.add_vpp_config()
448
449         # Send PPPoE Discovery 2
450         tx3 = self.create_stream_pppoe_discovery(self.pg2, self.pg1,
451                                                  self.pg2.remote_mac)
452         self.pg2.add_stream(tx3)
453         self.pg_start()
454
455         # Send PPPoE PPP LCP 2
456         tx4 = self.create_stream_pppoe_lcp(self.pg2, self.pg1,
457                                            self.pg2.remote_mac,
458                                            self.session_id + 1)
459         self.pg2.add_stream(tx4)
460         self.pg_start()
461
462         # Create PPPoE session 2
463         pppoe_if2 = VppPppoeInterface(self,
464                                       self.pg2.remote_ip4,
465                                       self.pg2.remote_mac,
466                                       self.session_id + 1)
467         pppoe_if2.add_vpp_config()
468
469         #
470         # Send tunneled packets that match the created tunnel and
471         # are decapped and forwarded
472         #
473         tx2 = self.create_stream_pppoe_ip4(self.pg0, self.pg1,
474                                            self.pg0.remote_mac,
475                                            self.session_id,
476                                            self.pg0.remote_ip4)
477         self.pg0.add_stream(tx2)
478
479         self.pg_enable_capture(self.pg_interfaces)
480         self.pg_start()
481
482         rx2 = self.pg1.get_capture(len(tx2))
483         self.verify_decapped_pppoe(self.pg0, rx2, tx2)
484
485         tx5 = self.create_stream_pppoe_ip4(self.pg2, self.pg1,
486                                            self.pg2.remote_mac,
487                                            self.session_id + 1,
488                                            self.pg2.remote_ip4)
489         self.pg2.add_stream(tx5)
490
491         self.pg_enable_capture(self.pg_interfaces)
492         self.pg_start()
493
494         rx5 = self.pg1.get_capture(len(tx5))
495         self.verify_decapped_pppoe(self.pg2, rx5, tx5)
496
497         self.logger.info(self.vapi.cli("show pppoe fib"))
498         self.logger.info(self.vapi.cli("show pppoe session"))
499         self.logger.info(self.vapi.cli("show ip fib"))
500
501         #
502         # test case cleanup
503         #
504
505         # Delete PPPoE session
506         pppoe_if1.remove_vpp_config()
507         pppoe_if2.remove_vpp_config()
508
509         # Delete a route that resolves the server's destination
510         route_sever_dst.remove_vpp_config()
511
512     def test_PPPoE_Encap_Multiple(self):
513         """ PPPoE Encap Multiple Sessions Test """
514
515         self.vapi.cli("clear trace")
516
517         #
518         # Add a route that resolves the server's destination
519         #
520         route_sever_dst = VppIpRoute(self, "100.1.1.100", 32,
521                                      [VppRoutePath(self.pg1.remote_ip4,
522                                                    self.pg1.sw_if_index)])
523         route_sever_dst.add_vpp_config()
524
525         # Send PPPoE Discovery 1
526         tx0 = self.create_stream_pppoe_discovery(self.pg0, self.pg1,
527                                                  self.pg0.remote_mac)
528         self.pg0.add_stream(tx0)
529         self.pg_start()
530
531         # Send PPPoE PPP LCP 1
532         tx1 = self.create_stream_pppoe_lcp(self.pg0, self.pg1,
533                                            self.pg0.remote_mac,
534                                            self.session_id)
535         self.pg0.add_stream(tx1)
536         self.pg_start()
537
538         # Create PPPoE session 1
539         pppoe_if1 = VppPppoeInterface(self,
540                                       self.pg0.remote_ip4,
541                                       self.pg0.remote_mac,
542                                       self.session_id)
543         pppoe_if1.add_vpp_config()
544
545         # Send PPPoE Discovery 2
546         tx3 = self.create_stream_pppoe_discovery(self.pg2, self.pg1,
547                                                  self.pg2.remote_mac)
548         self.pg2.add_stream(tx3)
549         self.pg_start()
550
551         # Send PPPoE PPP LCP 2
552         tx4 = self.create_stream_pppoe_lcp(self.pg2, self.pg1,
553                                            self.pg2.remote_mac,
554                                            self.session_id + 1)
555         self.pg2.add_stream(tx4)
556         self.pg_start()
557
558         # Create PPPoE session 2
559         pppoe_if2 = VppPppoeInterface(self,
560                                       self.pg2.remote_ip4,
561                                       self.pg2.remote_mac,
562                                       self.session_id + 1)
563         pppoe_if2.add_vpp_config()
564
565         #
566         # Send a packet stream that is routed into the session
567         #  - packets are PPPoE encapped
568         #
569         self.vapi.cli("clear trace")
570         tx2 = self.create_stream_ip4(self.pg1, self.pg0,
571                                      self.pg0.remote_ip4, self.dst_ip)
572         self.pg1.add_stream(tx2)
573
574         self.pg_enable_capture(self.pg_interfaces)
575         self.pg_start()
576
577         rx2 = self.pg0.get_capture(len(tx2))
578         self.verify_encaped_pppoe(self.pg1, rx2, tx2, self.session_id)
579
580         tx5 = self.create_stream_ip4(self.pg1, self.pg2,
581                                      self.pg2.remote_ip4, self.dst_ip)
582         self.pg1.add_stream(tx5)
583
584         self.pg_enable_capture(self.pg_interfaces)
585         self.pg_start()
586
587         rx5 = self.pg2.get_capture(len(tx5))
588         self.verify_encaped_pppoe(self.pg1, rx5, tx5, self.session_id + 1)
589
590         self.logger.info(self.vapi.cli("show pppoe fib"))
591         self.logger.info(self.vapi.cli("show pppoe session"))
592         self.logger.info(self.vapi.cli("show ip fib"))
593
594         #
595         # test case cleanup
596         #
597
598         # Delete PPPoE session
599         pppoe_if1.remove_vpp_config()
600         pppoe_if2.remove_vpp_config()
601
602         # Delete a route that resolves the server's destination
603         route_sever_dst.remove_vpp_config()
604
605 if __name__ == '__main__':
606     unittest.main(testRunner=VppTestRunner)