SR-MPLS: fixes and tests
[vpp.git] / test / test_srmpls.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
8     VppMplsIpBind, VppIpMRoute, VppMRoutePath, \
9     MRouteItfFlags, MRouteEntryFlags, DpoProto, VppIpTable, VppMplsTable, \
10     VppMplsLabel, MplsLspMode
11 from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
12
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether
15 from scapy.layers.inet import IP, UDP, ICMP
16 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
17 from scapy.contrib.mpls import MPLS
18
19
20 def verify_filter(capture, sent):
21     if not len(capture) == len(sent):
22         # filter out any IPv6 RAs from the capture
23         for p in capture:
24             if p.haslayer(IPv6):
25                 capture.remove(p)
26     return capture
27
28
29 def verify_mpls_stack(tst, rx, mpls_labels):
30     # the rx'd packet has the MPLS label popped
31     eth = rx[Ether]
32     tst.assertEqual(eth.type, 0x8847)
33
34     rx_mpls = rx[MPLS]
35
36     for ii in range(len(mpls_labels)):
37         tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
38         tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
39         tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
40
41         if ii == len(mpls_labels) - 1:
42             tst.assertEqual(rx_mpls.s, 1)
43         else:
44             # not end of stack
45             tst.assertEqual(rx_mpls.s, 0)
46             # pop the label to expose the next
47             rx_mpls = rx_mpls[MPLS].payload
48
49
50 class TestSRMPLS(VppTestCase):
51     """ SR-MPLS Test Case """
52
53     def setUp(self):
54         super(TestSRMPLS, self).setUp()
55
56         # create 2 pg interfaces
57         self.create_pg_interfaces(range(4))
58
59         # setup both interfaces
60         # assign them different tables.
61         table_id = 0
62         self.tables = []
63
64         tbl = VppMplsTable(self, 0)
65         tbl.add_vpp_config()
66         self.tables.append(tbl)
67
68         for i in self.pg_interfaces:
69             i.admin_up()
70             i.config_ip4()
71             i.resolve_arp()
72             i.config_ip6()
73             i.resolve_ndp()
74             i.enable_mpls()
75
76     def tearDown(self):
77         for i in self.pg_interfaces:
78             i.unconfig_ip4()
79             i.unconfig_ip6()
80             i.ip6_disable()
81             i.disable_mpls()
82             i.admin_down()
83         super(TestSRMPLS, self).tearDown()
84
85     def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
86         self.reset_packet_infos()
87         pkts = []
88         for i in range(0, 257):
89             info = self.create_packet_info(src_if, src_if)
90             payload = self.info_to_payload(info)
91             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
92                  IP(src=src_if.remote_ip4, dst=dst_ip,
93                     ttl=ip_ttl, tos=ip_dscp) /
94                  UDP(sport=1234, dport=1234) /
95                  Raw(payload))
96             info.data = p.copy()
97             pkts.append(p)
98         return pkts
99
100     def verify_capture_labelled_ip4(self, src_if, capture, sent,
101                                     mpls_labels, ip_ttl=None):
102         try:
103             capture = verify_filter(capture, sent)
104
105             self.assertEqual(len(capture), len(sent))
106
107             for i in range(len(capture)):
108                 tx = sent[i]
109                 rx = capture[i]
110                 tx_ip = tx[IP]
111                 rx_ip = rx[IP]
112
113                 verify_mpls_stack(self, rx, mpls_labels)
114
115                 self.assertEqual(rx_ip.src, tx_ip.src)
116                 self.assertEqual(rx_ip.dst, tx_ip.dst)
117                 if not ip_ttl:
118                     # IP processing post pop has decremented the TTL
119                     self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
120                 else:
121                     self.assertEqual(rx_ip.ttl, ip_ttl)
122
123         except:
124             raise
125
126     def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
127         try:
128             capture = verify_filter(capture, sent)
129
130             self.assertEqual(len(capture), len(sent))
131
132             for i in range(len(capture)):
133                 tx = sent[i]
134                 rx = capture[i]
135                 tx_ip = tx[IP]
136                 rx_ip = rx[IP]
137
138                 verify_mpls_stack(self, rx, mpls_labels)
139
140                 self.assertEqual(rx_ip.src, tx_ip.src)
141                 self.assertEqual(rx_ip.dst, tx_ip.dst)
142                 # IP processing post pop has decremented the TTL
143                 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
144
145         except:
146             raise
147
148     def test_sr_mpls(self):
149         """ SR MPLS """
150
151         #
152         # A simple MPLS xconnect - neos label in label out
153         #
154         route_32_eos = VppMplsRoute(self, 32, 0,
155                                     [VppRoutePath(self.pg0.remote_ip4,
156                                                   self.pg0.sw_if_index,
157                                                   labels=[VppMplsLabel(32)])])
158         route_32_eos.add_vpp_config()
159
160         #
161         # A binding SID with only one label
162         #
163         self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
164
165         #
166         # A labeled IP route that resolves thru the binding SID
167         #
168         ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
169                                  [VppRoutePath("0.0.0.0",
170                                                0xffffffff,
171                                                nh_via_label=999,
172                                                labels=[VppMplsLabel(55)])])
173         ip_10_0_0_1.add_vpp_config()
174
175         tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
176         rx = self.send_and_expect(self.pg1, tx, self.pg0)
177         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
178                                          [VppMplsLabel(32),
179                                           VppMplsLabel(55)])
180
181         #
182         # An unlabeled IP route that resolves thru the binding SID
183         #
184         ip_10_0_0_1 = VppIpRoute(self, "10.0.0.2", 32,
185                                  [VppRoutePath("0.0.0.0",
186                                                0xffffffff,
187                                                nh_via_label=999)])
188         ip_10_0_0_1.add_vpp_config()
189
190         tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
191         rx = self.send_and_expect(self.pg1, tx, self.pg0)
192         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
193                                          [VppMplsLabel(32)])
194
195         self.vapi.sr_mpls_policy_del(999)
196
197         #
198         # this time the SID has many labels pushed
199         #
200         self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
201
202         tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
203         rx = self.send_and_expect(self.pg1, tx, self.pg0)
204         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
205                                          [VppMplsLabel(32),
206                                           VppMplsLabel(33),
207                                           VppMplsLabel(34),
208                                           VppMplsLabel(55)])
209         tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
210         rx = self.send_and_expect(self.pg1, tx, self.pg0)
211         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
212                                          [VppMplsLabel(32),
213                                           VppMplsLabel(33),
214                                           VppMplsLabel(34)])
215
216         #
217         # Resolve an MPLS tunnel via the SID
218         #
219         mpls_tun = VppMPLSTunnelInterface(
220             self,
221             [VppRoutePath("0.0.0.0",
222                           0xffffffff,
223                           nh_via_label=999,
224                           labels=[VppMplsLabel(44),
225                                   VppMplsLabel(46)])])
226         mpls_tun.add_vpp_config()
227         mpls_tun.admin_up()
228
229         #
230         # add an unlabelled route through the new tunnel
231         #
232         route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
233                                     [VppRoutePath("0.0.0.0",
234                                                   mpls_tun._sw_if_index)])
235         route_10_0_0_3.add_vpp_config()
236         self.logger.info(self.vapi.cli("sh mpls tun 0"))
237         self.logger.info(self.vapi.cli("sh adj 21"))
238
239         tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
240         rx = self.send_and_expect(self.pg1, tx, self.pg0)
241         self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
242                                          [VppMplsLabel(32),
243                                           VppMplsLabel(33),
244                                           VppMplsLabel(34),
245                                           VppMplsLabel(44),
246                                           VppMplsLabel(46)])
247
248         #
249         # add a labelled route through the new tunnel
250         #
251         route_10_0_0_3 = VppIpRoute(self, "10.0.0.4", 32,
252                                     [VppRoutePath("0.0.0.0",
253                                                   mpls_tun._sw_if_index,
254                                                   labels=[VppMplsLabel(55)])])
255         route_10_0_0_3.add_vpp_config()
256
257         tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
258         rx = self.send_and_expect(self.pg1, tx, self.pg0)
259         self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
260                                          [VppMplsLabel(32),
261                                           VppMplsLabel(33),
262                                           VppMplsLabel(34),
263                                           VppMplsLabel(44),
264                                           VppMplsLabel(46),
265                                           VppMplsLabel(55)])
266
267         self.vapi.sr_mpls_policy_del(999)
268
269
270 if __name__ == '__main__':
271     unittest.main(testRunner=VppTestRunner)