misc: Fix python scripts shebang line
[vpp.git] / test / test_srmpls.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import socket
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
9     VppIpTable, VppMplsTable, VppMplsLabel
10 from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
11
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, UDP, ICMP
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
16 from scapy.contrib.mpls import MPLS
17
18
19 def verify_filter(capture, sent):
20     if not len(capture) == len(sent):
21         # filter out any IPv6 RAs from the capture
22         for p in capture:
23             if p.haslayer(IPv6):
24                 capture.remove(p)
25     return capture
26
27
28 def verify_mpls_stack(tst, rx, mpls_labels):
29     # the rx'd packet has the MPLS label popped
30     eth = rx[Ether]
31     tst.assertEqual(eth.type, 0x8847)
32
33     rx_mpls = rx[MPLS]
34
35     for ii in range(len(mpls_labels)):
36         tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
37         tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
38         tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
39
40         if ii == len(mpls_labels) - 1:
41             tst.assertEqual(rx_mpls.s, 1)
42         else:
43             # not end of stack
44             tst.assertEqual(rx_mpls.s, 0)
45             # pop the label to expose the next
46             rx_mpls = rx_mpls[MPLS].payload
47
48
49 class TestSRMPLS(VppTestCase):
50     """ SR-MPLS Test Case """
51
52     @classmethod
53     def setUpClass(cls):
54         super(TestSRMPLS, cls).setUpClass()
55
56     @classmethod
57     def tearDownClass(cls):
58         super(TestSRMPLS, cls).tearDownClass()
59
60     def setUp(self):
61         super(TestSRMPLS, self).setUp()
62
63         # create 2 pg interfaces
64         self.create_pg_interfaces(range(4))
65
66         # setup both interfaces
67         # assign them different tables.
68         table_id = 0
69         self.tables = []
70
71         tbl = VppMplsTable(self, 0)
72         tbl.add_vpp_config()
73         self.tables.append(tbl)
74
75         for i in self.pg_interfaces:
76             i.admin_up()
77             i.config_ip4()
78             i.resolve_arp()
79             i.config_ip6()
80             i.resolve_ndp()
81             i.enable_mpls()
82
83     def tearDown(self):
84         for i in self.pg_interfaces:
85             i.unconfig_ip4()
86             i.unconfig_ip6()
87             i.ip6_disable()
88             i.disable_mpls()
89             i.admin_down()
90         super(TestSRMPLS, self).tearDown()
91
92     def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
93         self.reset_packet_infos()
94         pkts = []
95         for i in range(0, 257):
96             info = self.create_packet_info(src_if, src_if)
97             payload = self.info_to_payload(info)
98             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
99                  IP(src=src_if.remote_ip4, dst=dst_ip,
100                     ttl=ip_ttl, tos=ip_dscp) /
101                  UDP(sport=1234, dport=1234) /
102                  Raw(payload))
103             info.data = p.copy()
104             pkts.append(p)
105         return pkts
106
107     def verify_capture_labelled_ip4(self, src_if, capture, sent,
108                                     mpls_labels, ip_ttl=None):
109         try:
110             capture = verify_filter(capture, sent)
111
112             self.assertEqual(len(capture), len(sent))
113
114             for i in range(len(capture)):
115                 tx = sent[i]
116                 rx = capture[i]
117                 tx_ip = tx[IP]
118                 rx_ip = rx[IP]
119
120                 verify_mpls_stack(self, rx, mpls_labels)
121
122                 self.assertEqual(rx_ip.src, tx_ip.src)
123                 self.assertEqual(rx_ip.dst, tx_ip.dst)
124                 if not ip_ttl:
125                     # IP processing post pop has decremented the TTL
126                     self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
127                 else:
128                     self.assertEqual(rx_ip.ttl, ip_ttl)
129
130         except:
131             raise
132
133     def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
134         try:
135             capture = verify_filter(capture, sent)
136
137             self.assertEqual(len(capture), len(sent))
138
139             for i in range(len(capture)):
140                 tx = sent[i]
141                 rx = capture[i]
142                 tx_ip = tx[IP]
143                 rx_ip = rx[IP]
144
145                 verify_mpls_stack(self, rx, mpls_labels)
146
147                 self.assertEqual(rx_ip.src, tx_ip.src)
148                 self.assertEqual(rx_ip.dst, tx_ip.dst)
149                 # IP processing post pop has decremented the TTL
150                 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
151
152         except:
153             raise
154
155     def test_sr_mpls(self):
156         """ SR MPLS """
157
158         #
159         # A simple MPLS xconnect - neos label in label out
160         #
161         route_32_eos = VppMplsRoute(self, 32, 0,
162                                     [VppRoutePath(self.pg0.remote_ip4,
163                                                   self.pg0.sw_if_index,
164                                                   labels=[VppMplsLabel(32)])])
165         route_32_eos.add_vpp_config()
166
167         #
168         # A binding SID with only one label
169         #
170         self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
171
172         #
173         # A labeled IP route that resolves thru the binding SID
174         #
175         ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
176                                  [VppRoutePath("0.0.0.0",
177                                                0xffffffff,
178                                                nh_via_label=999,
179                                                labels=[VppMplsLabel(55)])])
180         ip_10_0_0_1.add_vpp_config()
181
182         tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
183         rx = self.send_and_expect(self.pg1, tx, self.pg0)
184         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
185                                          [VppMplsLabel(32),
186                                           VppMplsLabel(55)])
187
188         #
189         # An unlabeled IP route that resolves thru the binding SID
190         #
191         ip_10_0_0_1 = VppIpRoute(self, "10.0.0.2", 32,
192                                  [VppRoutePath("0.0.0.0",
193                                                0xffffffff,
194                                                nh_via_label=999)])
195         ip_10_0_0_1.add_vpp_config()
196
197         tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
198         rx = self.send_and_expect(self.pg1, tx, self.pg0)
199         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
200                                          [VppMplsLabel(32)])
201
202         self.vapi.sr_mpls_policy_del(999)
203
204         #
205         # this time the SID has many labels pushed
206         #
207         self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
208
209         tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
210         rx = self.send_and_expect(self.pg1, tx, self.pg0)
211         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
212                                          [VppMplsLabel(32),
213                                           VppMplsLabel(33),
214                                           VppMplsLabel(34),
215                                           VppMplsLabel(55)])
216         tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
217         rx = self.send_and_expect(self.pg1, tx, self.pg0)
218         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
219                                          [VppMplsLabel(32),
220                                           VppMplsLabel(33),
221                                           VppMplsLabel(34)])
222
223         #
224         # Resolve an MPLS tunnel via the SID
225         #
226         mpls_tun = VppMPLSTunnelInterface(
227             self,
228             [VppRoutePath("0.0.0.0",
229                           0xffffffff,
230                           nh_via_label=999,
231                           labels=[VppMplsLabel(44),
232                                   VppMplsLabel(46)])])
233         mpls_tun.add_vpp_config()
234         mpls_tun.admin_up()
235
236         #
237         # add an unlabelled route through the new tunnel
238         #
239         route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
240                                     [VppRoutePath("0.0.0.0",
241                                                   mpls_tun._sw_if_index)])
242         route_10_0_0_3.add_vpp_config()
243         self.logger.info(self.vapi.cli("sh mpls tun 0"))
244         self.logger.info(self.vapi.cli("sh adj 21"))
245
246         tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
247         rx = self.send_and_expect(self.pg1, tx, self.pg0)
248         self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
249                                          [VppMplsLabel(32),
250                                           VppMplsLabel(33),
251                                           VppMplsLabel(34),
252                                           VppMplsLabel(44),
253                                           VppMplsLabel(46)])
254
255         #
256         # add a labelled route through the new tunnel
257         #
258         route_10_0_0_3 = VppIpRoute(self, "10.0.0.4", 32,
259                                     [VppRoutePath("0.0.0.0",
260                                                   mpls_tun._sw_if_index,
261                                                   labels=[VppMplsLabel(55)])])
262         route_10_0_0_3.add_vpp_config()
263
264         tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
265         rx = self.send_and_expect(self.pg1, tx, self.pg0)
266         self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
267                                          [VppMplsLabel(32),
268                                           VppMplsLabel(33),
269                                           VppMplsLabel(34),
270                                           VppMplsLabel(44),
271                                           VppMplsLabel(46),
272                                           VppMplsLabel(55)])
273
274         self.vapi.sr_mpls_policy_del(999)
275
276
277 if __name__ == '__main__':
278     unittest.main(testRunner=VppTestRunner)