MPLS Exp-null Tests
[vpp.git] / test / test_mpls.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5 from logging import *
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, Dot1Q, ARP
12 from scapy.layers.inet import IP, UDP
13 from scapy.layers.inet6 import ICMPv6ND_NS, IPv6, UDP
14 from scapy.contrib.mpls import MPLS
15
16 class TestMPLS(VppTestCase):
17     """ MPLS Test Case """
18
19     @classmethod
20     def setUpClass(cls):
21         super(TestMPLS, cls).setUpClass()
22
23     def setUp(self):
24         super(TestMPLS, self).setUp()
25
26         # create 2 pg interfaces
27         self.create_pg_interfaces(range(3))
28
29         # setup both interfaces
30         # assign them different tables.
31         table_id = 0
32
33         for i in self.pg_interfaces:
34             i.admin_up()
35             i.set_table_ip4(table_id)
36             i.set_table_ip6(table_id)
37             i.config_ip4()
38             i.config_ip6()
39             i.enable_mpls()
40             i.resolve_arp()
41             i.resolve_ndp()
42             table_id += 1
43
44     def tearDown(self):
45         super(TestMPLS, self).tearDown()
46
47     def create_stream_ip4(self, src_if, mpls_label, mpls_ttl):
48         pkts = []
49         for i in range(0, 257):
50             info = self.create_packet_info(src_if.sw_if_index,
51                                            src_if.sw_if_index)
52             payload = self.info_to_payload(info)
53             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
54                  MPLS(label=mpls_label, ttl=mpls_ttl) /
55                  IP(src=src_if.remote_ip4, dst=src_if.remote_ip4) /
56                  UDP(sport=1234, dport=1234) /
57                  Raw(payload))
58             info.data = p.copy()
59             pkts.append(p)
60         return pkts
61
62     def create_stream_ip6(self, src_if, mpls_label, mpls_ttl):
63         pkts = []
64         for i in range(0, 257):
65             info = self.create_packet_info(src_if.sw_if_index,
66                                            src_if.sw_if_index)
67             payload = self.info_to_payload(info)
68             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
69                  MPLS(label=mpls_label, ttl=mpls_ttl) /
70                  IPv6(src=src_if.remote_ip6, dst=src_if.remote_ip6) /
71                  UDP(sport=1234, dport=1234) /
72                  Raw(payload))
73             info.data = p.copy()
74             pkts.append(p)
75         return pkts
76
77     def verify_capture_ip4(self, src_if, capture, sent):
78         try:
79             self.assertEqual(len(capture), len(sent))
80
81             for i in range(len(capture)):
82                 tx = sent[i]
83                 rx = capture[i]
84
85                 # the rx'd packet has the MPLS label popped
86                 eth = rx[Ether];
87                 self.assertEqual(eth.type, 0x800);
88
89                 tx_ip = tx[IP]
90                 rx_ip = rx[IP]
91
92                 self.assertEqual(rx_ip.src, tx_ip.src)
93                 self.assertEqual(rx_ip.dst, tx_ip.dst)
94                 # IP processing post pop has decremented the TTL
95                 self.assertEqual(rx_ip.ttl+1, tx_ip.ttl)
96
97         except:
98             raise;
99
100     def verify_capture_ip6(self, src_if, capture, sent):
101         try:
102             self.assertEqual(len(capture), len(sent))
103
104             for i in range(len(capture)):
105                 tx = sent[i]
106                 rx = capture[i]
107
108                 # the rx'd packet has the MPLS label popped
109                 eth = rx[Ether];
110                 self.assertEqual(eth.type, 0x86DD);
111
112                 tx_ip = tx[IPv6]
113                 rx_ip = rx[IPv6]
114
115                 self.assertEqual(rx_ip.src, tx_ip.src)
116                 self.assertEqual(rx_ip.dst, tx_ip.dst)
117                 # IP processing post pop has decremented the TTL
118                 self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim)
119
120         except:
121             raise;
122
123
124     def test_v4_exp_null(self):
125         """ MPLS V4 Explicit NULL test """
126
127         #
128         # The first test case has an MPLS TTL of 0
129         # all packet should be dropped
130         #
131         tx = self.create_stream_ip4(self.pg0, 0, 0)
132         self.pg0.add_stream(tx)
133
134         self.pg_enable_capture(self.pg_interfaces)
135         self.pg_start()
136
137         rx = self.pg0.get_capture()
138
139         try:
140             self.assertEqual(0, len(rx));
141         except:
142             error("MPLS TTL=0 packets forwarded")
143             error(packet.show())
144             raise
145
146         #
147         # a stream with a non-zero MPLS TTL
148         # PG0 is in the default table
149         #
150         self.vapi.cli("clear trace")
151         tx = self.create_stream_ip4(self.pg0, 0, 2)
152         self.pg0.add_stream(tx)
153
154         self.pg_enable_capture(self.pg_interfaces)
155         self.pg_start()
156
157         rx = self.pg0.get_capture()
158         self.verify_capture_ip4(self.pg0, rx, tx)
159
160         #
161         # a stream with a non-zero MPLS TTL
162         # PG1 is in table 1
163         # we are ensuring the post-pop lookup occurs in the VRF table
164         #
165         self.vapi.cli("clear trace")
166         tx = self.create_stream_ip4(self.pg1, 0, 2)
167         self.pg1.add_stream(tx)
168
169         self.pg_enable_capture(self.pg_interfaces)
170         self.pg_start()
171
172         rx = self.pg1.get_capture()
173         self.verify_capture_ip4(self.pg0, rx, tx)
174
175     def test_v6_exp_null(self):
176         """ MPLS V6 Explicit NULL test """
177
178         #
179         # a stream with a non-zero MPLS TTL
180         # PG0 is in the default table
181         #
182         self.vapi.cli("clear trace")
183         tx = self.create_stream_ip6(self.pg0, 2, 2)
184         self.pg0.add_stream(tx)
185
186         self.pg_enable_capture(self.pg_interfaces)
187         self.pg_start()
188
189         rx = self.pg0.get_capture()
190         self.verify_capture_ip6(self.pg0, rx, tx)
191
192         #
193         # a stream with a non-zero MPLS TTL
194         # PG1 is in table 1
195         # we are ensuring the post-pop lookup occurs in the VRF table
196         #
197         self.vapi.cli("clear trace")
198         tx = self.create_stream_ip6(self.pg1, 2, 2)
199         self.pg1.add_stream(tx)
200
201         self.pg_enable_capture(self.pg_interfaces)
202         self.pg_start()
203
204         rx = self.pg1.get_capture()
205         self.verify_capture_ip6(self.pg0, rx, tx)
206
207
208 if __name__ == '__main__':
209     unittest.main(testRunner=VppTestRunner)