MAP: Convert from DPO to input feature.
[vpp.git] / test / test_srmpls.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
9     VppIpTable, VppMplsTable, VppMplsLabel
10 from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
11
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, UDP, ICMP
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
16 from scapy.contrib.mpls import MPLS
17
18
19 def verify_filter(capture, sent):
20     if not len(capture) == len(sent):
21         # filter out any IPv6 RAs from the capture
22         for p in capture:
23             if p.haslayer(IPv6):
24                 capture.remove(p)
25     return capture
26
27
28 def verify_mpls_stack(tst, rx, mpls_labels):
29     # the rx'd packet has the MPLS label popped
30     eth = rx[Ether]
31     tst.assertEqual(eth.type, 0x8847)
32
33     rx_mpls = rx[MPLS]
34
35     for ii in range(len(mpls_labels)):
36         tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
37         tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
38         tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
39
40         if ii == len(mpls_labels) - 1:
41             tst.assertEqual(rx_mpls.s, 1)
42         else:
43             # not end of stack
44             tst.assertEqual(rx_mpls.s, 0)
45             # pop the label to expose the next
46             rx_mpls = rx_mpls[MPLS].payload
47
48
49 class TestSRMPLS(VppTestCase):
50     """ SR-MPLS Test Case """
51
52     def setUp(self):
53         super(TestSRMPLS, self).setUp()
54
55         # create 2 pg interfaces
56         self.create_pg_interfaces(range(4))
57
58         # setup both interfaces
59         # assign them different tables.
60         table_id = 0
61         self.tables = []
62
63         tbl = VppMplsTable(self, 0)
64         tbl.add_vpp_config()
65         self.tables.append(tbl)
66
67         for i in self.pg_interfaces:
68             i.admin_up()
69             i.config_ip4()
70             i.resolve_arp()
71             i.config_ip6()
72             i.resolve_ndp()
73             i.enable_mpls()
74
75     def tearDown(self):
76         for i in self.pg_interfaces:
77             i.unconfig_ip4()
78             i.unconfig_ip6()
79             i.ip6_disable()
80             i.disable_mpls()
81             i.admin_down()
82         super(TestSRMPLS, self).tearDown()
83
84     def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
85         self.reset_packet_infos()
86         pkts = []
87         for i in range(0, 257):
88             info = self.create_packet_info(src_if, src_if)
89             payload = self.info_to_payload(info)
90             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
91                  IP(src=src_if.remote_ip4, dst=dst_ip,
92                     ttl=ip_ttl, tos=ip_dscp) /
93                  UDP(sport=1234, dport=1234) /
94                  Raw(payload))
95             info.data = p.copy()
96             pkts.append(p)
97         return pkts
98
99     def verify_capture_labelled_ip4(self, src_if, capture, sent,
100                                     mpls_labels, ip_ttl=None):
101         try:
102             capture = verify_filter(capture, sent)
103
104             self.assertEqual(len(capture), len(sent))
105
106             for i in range(len(capture)):
107                 tx = sent[i]
108                 rx = capture[i]
109                 tx_ip = tx[IP]
110                 rx_ip = rx[IP]
111
112                 verify_mpls_stack(self, rx, mpls_labels)
113
114                 self.assertEqual(rx_ip.src, tx_ip.src)
115                 self.assertEqual(rx_ip.dst, tx_ip.dst)
116                 if not ip_ttl:
117                     # IP processing post pop has decremented the TTL
118                     self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
119                 else:
120                     self.assertEqual(rx_ip.ttl, ip_ttl)
121
122         except:
123             raise
124
125     def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
126         try:
127             capture = verify_filter(capture, sent)
128
129             self.assertEqual(len(capture), len(sent))
130
131             for i in range(len(capture)):
132                 tx = sent[i]
133                 rx = capture[i]
134                 tx_ip = tx[IP]
135                 rx_ip = rx[IP]
136
137                 verify_mpls_stack(self, rx, mpls_labels)
138
139                 self.assertEqual(rx_ip.src, tx_ip.src)
140                 self.assertEqual(rx_ip.dst, tx_ip.dst)
141                 # IP processing post pop has decremented the TTL
142                 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
143
144         except:
145             raise
146
147     def test_sr_mpls(self):
148         """ SR MPLS """
149
150         #
151         # A simple MPLS xconnect - neos label in label out
152         #
153         route_32_eos = VppMplsRoute(self, 32, 0,
154                                     [VppRoutePath(self.pg0.remote_ip4,
155                                                   self.pg0.sw_if_index,
156                                                   labels=[VppMplsLabel(32)])])
157         route_32_eos.add_vpp_config()
158
159         #
160         # A binding SID with only one label
161         #
162         self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
163
164         #
165         # A labeled IP route that resolves thru the binding SID
166         #
167         ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
168                                  [VppRoutePath("0.0.0.0",
169                                                0xffffffff,
170                                                nh_via_label=999,
171                                                labels=[VppMplsLabel(55)])])
172         ip_10_0_0_1.add_vpp_config()
173
174         tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
175         rx = self.send_and_expect(self.pg1, tx, self.pg0)
176         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
177                                          [VppMplsLabel(32),
178                                           VppMplsLabel(55)])
179
180         #
181         # An unlabeled IP route that resolves thru the binding SID
182         #
183         ip_10_0_0_1 = VppIpRoute(self, "10.0.0.2", 32,
184                                  [VppRoutePath("0.0.0.0",
185                                                0xffffffff,
186                                                nh_via_label=999)])
187         ip_10_0_0_1.add_vpp_config()
188
189         tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
190         rx = self.send_and_expect(self.pg1, tx, self.pg0)
191         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
192                                          [VppMplsLabel(32)])
193
194         self.vapi.sr_mpls_policy_del(999)
195
196         #
197         # this time the SID has many labels pushed
198         #
199         self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
200
201         tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
202         rx = self.send_and_expect(self.pg1, tx, self.pg0)
203         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
204                                          [VppMplsLabel(32),
205                                           VppMplsLabel(33),
206                                           VppMplsLabel(34),
207                                           VppMplsLabel(55)])
208         tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
209         rx = self.send_and_expect(self.pg1, tx, self.pg0)
210         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
211                                          [VppMplsLabel(32),
212                                           VppMplsLabel(33),
213                                           VppMplsLabel(34)])
214
215         #
216         # Resolve an MPLS tunnel via the SID
217         #
218         mpls_tun = VppMPLSTunnelInterface(
219             self,
220             [VppRoutePath("0.0.0.0",
221                           0xffffffff,
222                           nh_via_label=999,
223                           labels=[VppMplsLabel(44),
224                                   VppMplsLabel(46)])])
225         mpls_tun.add_vpp_config()
226         mpls_tun.admin_up()
227
228         #
229         # add an unlabelled route through the new tunnel
230         #
231         route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
232                                     [VppRoutePath("0.0.0.0",
233                                                   mpls_tun._sw_if_index)])
234         route_10_0_0_3.add_vpp_config()
235         self.logger.info(self.vapi.cli("sh mpls tun 0"))
236         self.logger.info(self.vapi.cli("sh adj 21"))
237
238         tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
239         rx = self.send_and_expect(self.pg1, tx, self.pg0)
240         self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
241                                          [VppMplsLabel(32),
242                                           VppMplsLabel(33),
243                                           VppMplsLabel(34),
244                                           VppMplsLabel(44),
245                                           VppMplsLabel(46)])
246
247         #
248         # add a labelled route through the new tunnel
249         #
250         route_10_0_0_3 = VppIpRoute(self, "10.0.0.4", 32,
251                                     [VppRoutePath("0.0.0.0",
252                                                   mpls_tun._sw_if_index,
253                                                   labels=[VppMplsLabel(55)])])
254         route_10_0_0_3.add_vpp_config()
255
256         tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
257         rx = self.send_and_expect(self.pg1, tx, self.pg0)
258         self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
259                                          [VppMplsLabel(32),
260                                           VppMplsLabel(33),
261                                           VppMplsLabel(34),
262                                           VppMplsLabel(44),
263                                           VppMplsLabel(46),
264                                           VppMplsLabel(55)])
265
266         self.vapi.sr_mpls_policy_del(999)
267
268
269 if __name__ == '__main__':
270     unittest.main(testRunner=VppTestRunner)