geneve: Fix the byte swapping for the VNI
[vpp.git] / test / template_bd.py
1 #!/usr/bin/env python3
2
3 import abc
4 import six
5
6 from scapy.layers.l2 import Ether
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9
10 from util import ip4_range
11
12
13 @six.add_metaclass(abc.ABCMeta)
14 class BridgeDomain(object):
15     """ Bridge domain abstraction """
16
17     @property
18     def frame_request(self):
19         """ Ethernet frame modeling a generic request """
20         return (Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') /
21                 IP(src='1.2.3.4', dst='4.3.2.1') /
22                 UDP(sport=10000, dport=20000) /
23                 Raw('\xa5' * 100))
24
25     @property
26     def frame_reply(self):
27         """ Ethernet frame modeling a generic reply """
28         return (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
29                 IP(src='4.3.2.1', dst='1.2.3.4') /
30                 UDP(sport=20000, dport=10000) /
31                 Raw('\xa5' * 100))
32
33     @abc.abstractmethod
34     def ip_range(self, start, end):
35         """ range of remote ip's """
36         pass
37
38     @abc.abstractmethod
39     def encap_mcast(self, pkt, src_ip, src_mac, vni):
40         """ Encapsulate mcast packet """
41         pass
42
43     @abc.abstractmethod
44     def encapsulate(self, pkt, vni):
45         """ Encapsulate packet """
46         pass
47
48     @abc.abstractmethod
49     def decapsulate(self, pkt):
50         """ Decapsulate packet """
51         pass
52
53     @abc.abstractmethod
54     def check_encapsulation(self, pkt, vni, local_only=False):
55         """ Verify the encapsulation """
56         pass
57
58     def assert_eq_pkts(self, pkt1, pkt2):
59         """ Verify the Ether, IP, UDP, payload are equal in both
60         packets
61         """
62         self.assertEqual(pkt1[Ether].src, pkt2[Ether].src)
63         self.assertEqual(pkt1[Ether].dst, pkt2[Ether].dst)
64         self.assertEqual(pkt1[IP].src, pkt2[IP].src)
65         self.assertEqual(pkt1[IP].dst, pkt2[IP].dst)
66         self.assertEqual(pkt1[UDP].sport, pkt2[UDP].sport)
67         self.assertEqual(pkt1[UDP].dport, pkt2[UDP].dport)
68         self.assertEqual(pkt1[Raw], pkt2[Raw])
69
70     def test_decap(self):
71         """ Decapsulation test
72         Send encapsulated frames from pg0
73         Verify receipt of decapsulated frames on pg1
74         """
75
76         encapsulated_pkt = self.encapsulate(self.frame_request,
77                                             self.single_tunnel_vni)
78
79         self.pg0.add_stream([encapsulated_pkt, ])
80
81         self.pg1.enable_capture()
82
83         self.pg_start()
84
85         # Pick first received frame and check if it's the non-encapsulated
86         # frame
87         out = self.pg1.get_capture(1)
88         pkt = out[0]
89         self.assert_eq_pkts(pkt, self.frame_request)
90
91     def test_encap(self):
92         """ Encapsulation test
93         Send frames from pg1
94         Verify receipt of encapsulated frames on pg0
95         """
96         self.pg1.add_stream([self.frame_reply])
97
98         self.pg0.enable_capture()
99
100         self.pg_start()
101
102         # Pick first received frame and check if it's correctly encapsulated.
103         out = self.pg0.get_capture(1)
104         pkt = out[0]
105         self.check_encapsulation(pkt, self.single_tunnel_vni)
106
107         payload = self.decapsulate(pkt)
108         self.assert_eq_pkts(payload, self.frame_reply)
109
110     def test_ucast_flood(self):
111         """ Unicast flood test
112         Send frames from pg3
113         Verify receipt of encapsulated frames on pg0
114         """
115         self.pg3.add_stream([self.frame_reply])
116
117         self.pg0.enable_capture()
118
119         self.pg_start()
120
121         # Get packet from each tunnel and assert it's correctly encapsulated.
122         out = self.pg0.get_capture(self.n_ucast_tunnels)
123         for pkt in out:
124             self.check_encapsulation(pkt, self.ucast_flood_bd, True)
125             payload = self.decapsulate(pkt)
126             self.assert_eq_pkts(payload, self.frame_reply)
127
128     def test_mcast_flood(self):
129         """ Multicast flood test
130         Send frames from pg2
131         Verify receipt of encapsulated frames on pg0
132         """
133         self.pg2.add_stream([self.frame_reply])
134
135         self.pg0.enable_capture()
136
137         self.pg_start()
138
139         # Pick first received frame and check if it's correctly encapsulated.
140         out = self.pg0.get_capture(1)
141         pkt = out[0]
142         self.check_encapsulation(pkt, self.mcast_flood_bd,
143                                  local_only=False, mcast_pkt=True)
144
145         payload = self.decapsulate(pkt)
146         self.assert_eq_pkts(payload, self.frame_reply)
147
148     def test_mcast_rcv(self):
149         """ Multicast receive test
150         Send 20 encapsulated frames from pg0 only 10 match unicast tunnels
151         Verify receipt of 10 decap frames on pg2
152         """
153         mac = self.pg0.remote_mac
154         ip_range_start = 10
155         ip_range_end = 30
156         mcast_stream = [
157             self.encap_mcast(self.frame_request, ip, mac, self.mcast_flood_bd)
158             for ip in self.ip_range(ip_range_start, ip_range_end)]
159         self.pg0.add_stream(mcast_stream)
160         self.pg2.enable_capture()
161         self.pg_start()
162         out = self.pg2.get_capture(10)
163         for pkt in out:
164             self.assert_eq_pkts(pkt, self.frame_request)