ip: Replace Sematics for Interface IP addresses
[vpp.git] / test / test_srmpls.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import socket
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
9     VppIpTable, VppMplsTable, VppMplsLabel
10 from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
11
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, UDP, ICMP
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
16 from scapy.contrib.mpls import MPLS
17
18
19 def verify_filter(capture, sent):
20     if not len(capture) == len(sent):
21         # filter out any IPv6 RAs from the capture
22         for p in capture:
23             if p.haslayer(IPv6):
24                 capture.remove(p)
25     return capture
26
27
28 def verify_mpls_stack(tst, rx, mpls_labels):
29     # the rx'd packet has the MPLS label popped
30     eth = rx[Ether]
31     tst.assertEqual(eth.type, 0x8847)
32
33     rx_mpls = rx[MPLS]
34
35     for ii in range(len(mpls_labels)):
36         tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
37         tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
38         tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
39
40         if ii == len(mpls_labels) - 1:
41             tst.assertEqual(rx_mpls.s, 1)
42         else:
43             # not end of stack
44             tst.assertEqual(rx_mpls.s, 0)
45             # pop the label to expose the next
46             rx_mpls = rx_mpls[MPLS].payload
47
48
49 class TestSRMPLS(VppTestCase):
50     """ SR-MPLS Test Case """
51
52     @classmethod
53     def setUpClass(cls):
54         super(TestSRMPLS, cls).setUpClass()
55
56     @classmethod
57     def tearDownClass(cls):
58         super(TestSRMPLS, cls).tearDownClass()
59
60     def setUp(self):
61         super(TestSRMPLS, self).setUp()
62
63         # create 2 pg interfaces
64         self.create_pg_interfaces(range(4))
65
66         # setup both interfaces
67         # assign them different tables.
68         table_id = 0
69         self.tables = []
70
71         tbl = VppMplsTable(self, 0)
72         tbl.add_vpp_config()
73         self.tables.append(tbl)
74
75         for i in self.pg_interfaces:
76             i.admin_up()
77             i.config_ip4()
78             i.resolve_arp()
79             i.config_ip6()
80             i.resolve_ndp()
81             i.enable_mpls()
82
83     def tearDown(self):
84         for i in self.pg_interfaces:
85             i.unconfig_ip4()
86             i.unconfig_ip6()
87             i.disable_mpls()
88             i.admin_down()
89         super(TestSRMPLS, self).tearDown()
90
91     def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
92         self.reset_packet_infos()
93         pkts = []
94         for i in range(0, 257):
95             info = self.create_packet_info(src_if, src_if)
96             payload = self.info_to_payload(info)
97             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
98                  IP(src=src_if.remote_ip4, dst=dst_ip,
99                     ttl=ip_ttl, tos=ip_dscp) /
100                  UDP(sport=1234, dport=1234) /
101                  Raw(payload))
102             info.data = p.copy()
103             pkts.append(p)
104         return pkts
105
106     def verify_capture_labelled_ip4(self, src_if, capture, sent,
107                                     mpls_labels, ip_ttl=None):
108         try:
109             capture = verify_filter(capture, sent)
110
111             self.assertEqual(len(capture), len(sent))
112
113             for i in range(len(capture)):
114                 tx = sent[i]
115                 rx = capture[i]
116                 tx_ip = tx[IP]
117                 rx_ip = rx[IP]
118
119                 verify_mpls_stack(self, rx, mpls_labels)
120
121                 self.assertEqual(rx_ip.src, tx_ip.src)
122                 self.assertEqual(rx_ip.dst, tx_ip.dst)
123                 if not ip_ttl:
124                     # IP processing post pop has decremented the TTL
125                     self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
126                 else:
127                     self.assertEqual(rx_ip.ttl, ip_ttl)
128
129         except:
130             raise
131
132     def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
133         try:
134             capture = verify_filter(capture, sent)
135
136             self.assertEqual(len(capture), len(sent))
137
138             for i in range(len(capture)):
139                 tx = sent[i]
140                 rx = capture[i]
141                 tx_ip = tx[IP]
142                 rx_ip = rx[IP]
143
144                 verify_mpls_stack(self, rx, mpls_labels)
145
146                 self.assertEqual(rx_ip.src, tx_ip.src)
147                 self.assertEqual(rx_ip.dst, tx_ip.dst)
148                 # IP processing post pop has decremented the TTL
149                 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
150
151         except:
152             raise
153
154     def test_sr_mpls(self):
155         """ SR MPLS """
156
157         #
158         # A simple MPLS xconnect - neos label in label out
159         #
160         route_32_eos = VppMplsRoute(self, 32, 0,
161                                     [VppRoutePath(self.pg0.remote_ip4,
162                                                   self.pg0.sw_if_index,
163                                                   labels=[VppMplsLabel(32)])])
164         route_32_eos.add_vpp_config()
165
166         #
167         # A binding SID with only one label
168         #
169         self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
170
171         #
172         # A labeled IP route that resolves thru the binding SID
173         #
174         ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
175                                  [VppRoutePath("0.0.0.0",
176                                                0xffffffff,
177                                                nh_via_label=999,
178                                                labels=[VppMplsLabel(55)])])
179         ip_10_0_0_1.add_vpp_config()
180
181         tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
182         rx = self.send_and_expect(self.pg1, tx, self.pg0)
183         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
184                                          [VppMplsLabel(32),
185                                           VppMplsLabel(55)])
186
187         #
188         # An unlabeled IP route that resolves thru the binding SID
189         #
190         ip_10_0_0_1 = VppIpRoute(self, "10.0.0.2", 32,
191                                  [VppRoutePath("0.0.0.0",
192                                                0xffffffff,
193                                                nh_via_label=999)])
194         ip_10_0_0_1.add_vpp_config()
195
196         tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
197         rx = self.send_and_expect(self.pg1, tx, self.pg0)
198         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
199                                          [VppMplsLabel(32)])
200
201         self.vapi.sr_mpls_policy_del(999)
202
203         #
204         # this time the SID has many labels pushed
205         #
206         self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
207
208         tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
209         rx = self.send_and_expect(self.pg1, tx, self.pg0)
210         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
211                                          [VppMplsLabel(32),
212                                           VppMplsLabel(33),
213                                           VppMplsLabel(34),
214                                           VppMplsLabel(55)])
215         tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
216         rx = self.send_and_expect(self.pg1, tx, self.pg0)
217         self.verify_capture_labelled_ip4(self.pg0, rx, tx,
218                                          [VppMplsLabel(32),
219                                           VppMplsLabel(33),
220                                           VppMplsLabel(34)])
221
222         #
223         # Resolve an MPLS tunnel via the SID
224         #
225         mpls_tun = VppMPLSTunnelInterface(
226             self,
227             [VppRoutePath("0.0.0.0",
228                           0xffffffff,
229                           nh_via_label=999,
230                           labels=[VppMplsLabel(44),
231                                   VppMplsLabel(46)])])
232         mpls_tun.add_vpp_config()
233         mpls_tun.admin_up()
234
235         #
236         # add an unlabelled route through the new tunnel
237         #
238         route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
239                                     [VppRoutePath("0.0.0.0",
240                                                   mpls_tun._sw_if_index)])
241         route_10_0_0_3.add_vpp_config()
242         self.logger.info(self.vapi.cli("sh mpls tun 0"))
243         self.logger.info(self.vapi.cli("sh adj 21"))
244
245         tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
246         rx = self.send_and_expect(self.pg1, tx, self.pg0)
247         self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
248                                          [VppMplsLabel(32),
249                                           VppMplsLabel(33),
250                                           VppMplsLabel(34),
251                                           VppMplsLabel(44),
252                                           VppMplsLabel(46)])
253
254         #
255         # add a labelled route through the new tunnel
256         #
257         route_10_0_0_3 = VppIpRoute(self, "10.0.0.4", 32,
258                                     [VppRoutePath("0.0.0.0",
259                                                   mpls_tun._sw_if_index,
260                                                   labels=[VppMplsLabel(55)])])
261         route_10_0_0_3.add_vpp_config()
262
263         tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
264         rx = self.send_and_expect(self.pg1, tx, self.pg0)
265         self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
266                                          [VppMplsLabel(32),
267                                           VppMplsLabel(33),
268                                           VppMplsLabel(34),
269                                           VppMplsLabel(44),
270                                           VppMplsLabel(46),
271                                           VppMplsLabel(55)])
272
273         self.vapi.sr_mpls_policy_del(999)
274
275
276 if __name__ == '__main__':
277     unittest.main(testRunner=VppTestRunner)