tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / template_bd.py
1 #!/usr/bin/env python3
2
3 import abc
4
5 from scapy.layers.l2 import Ether
6 from scapy.packet import Raw
7 from scapy.layers.inet import IP, UDP
8
9
10 class BridgeDomain(metaclass=abc.ABCMeta):
11     """Bridge domain abstraction"""
12
13     @property
14     def frame_request(self):
15         """Ethernet frame modeling a generic request"""
16         return (
17             Ether(src="00:00:00:00:00:01", dst="00:00:00:00:00:02")
18             / IP(src="1.2.3.4", dst="4.3.2.1")
19             / UDP(sport=10000, dport=20000)
20             / Raw("\xa5" * 100)
21         )
22
23     @property
24     def frame_reply(self):
25         """Ethernet frame modeling a generic reply"""
26         return (
27             Ether(src="00:00:00:00:00:02", dst="00:00:00:00:00:01")
28             / IP(src="4.3.2.1", dst="1.2.3.4")
29             / UDP(sport=20000, dport=10000)
30             / Raw("\xa5" * 100)
31         )
32
33     @abc.abstractmethod
34     def ip_range(self, start, end):
35         """range of remote ip's"""
36         pass
37
38     @abc.abstractmethod
39     def encap_mcast(self, pkt, src_ip, src_mac, vni):
40         """Encapsulate mcast packet"""
41         pass
42
43     @abc.abstractmethod
44     def encapsulate(self, pkt, vni):
45         """Encapsulate packet"""
46         pass
47
48     @abc.abstractmethod
49     def decapsulate(self, pkt):
50         """Decapsulate packet"""
51         pass
52
53     @abc.abstractmethod
54     def check_encapsulation(self, pkt, vni, local_only=False):
55         """Verify the encapsulation"""
56         pass
57
58     def assert_eq_pkts(self, pkt1, pkt2):
59         """Verify the Ether, IP, UDP, payload are equal in both
60         packets
61         """
62         self.assertEqual(pkt1[Ether].src, pkt2[Ether].src)
63         self.assertEqual(pkt1[Ether].dst, pkt2[Ether].dst)
64         self.assertEqual(pkt1[IP].src, pkt2[IP].src)
65         self.assertEqual(pkt1[IP].dst, pkt2[IP].dst)
66         self.assertEqual(pkt1[UDP].sport, pkt2[UDP].sport)
67         self.assertEqual(pkt1[UDP].dport, pkt2[UDP].dport)
68         self.assertEqual(pkt1[Raw], pkt2[Raw])
69
70     def test_decap(self):
71         """Decapsulation test
72         Send encapsulated frames from pg0
73         Verify receipt of decapsulated frames on pg1
74         """
75
76         encapsulated_pkt = self.encapsulate(self.frame_request, self.single_tunnel_vni)
77
78         self.pg0.add_stream(
79             [
80                 encapsulated_pkt,
81             ]
82         )
83
84         self.pg1.enable_capture()
85
86         self.pg_start()
87
88         # Pick first received frame and check if it's the non-encapsulated
89         # frame
90         out = self.pg1.get_capture(1)
91         pkt = out[0]
92         self.assert_eq_pkts(pkt, self.frame_request)
93
94     def test_encap(self):
95         """Encapsulation test
96         Send frames from pg1
97         Verify receipt of encapsulated frames on pg0
98         """
99         self.pg1.add_stream([self.frame_reply])
100
101         self.pg0.enable_capture()
102
103         self.pg_start()
104
105         # Pick first received frame and check if it's correctly encapsulated.
106         out = self.pg0.get_capture(1)
107         pkt = out[0]
108         self.check_encapsulation(pkt, self.single_tunnel_vni)
109
110         payload = self.decapsulate(pkt)
111         self.assert_eq_pkts(payload, self.frame_reply)
112
113     def test_ucast_flood(self):
114         """Unicast flood test
115         Send frames from pg3
116         Verify receipt of encapsulated frames on pg0
117         """
118         self.pg3.add_stream([self.frame_reply])
119
120         self.pg0.enable_capture()
121
122         self.pg_start()
123
124         # Get packet from each tunnel and assert it's correctly encapsulated.
125         out = self.pg0.get_capture(self.n_ucast_tunnels)
126         for pkt in out:
127             self.check_encapsulation(pkt, self.ucast_flood_bd, True)
128             payload = self.decapsulate(pkt)
129             self.assert_eq_pkts(payload, self.frame_reply)
130
131     def test_mcast_flood(self):
132         """Multicast flood test
133         Send frames from pg2
134         Verify receipt of encapsulated frames on pg0
135         """
136         self.pg2.add_stream([self.frame_reply])
137
138         self.pg0.enable_capture()
139
140         self.pg_start()
141
142         # Pick first received frame and check if it's correctly encapsulated.
143         out = self.pg0.get_capture(1)
144         pkt = out[0]
145         self.check_encapsulation(
146             pkt, self.mcast_flood_bd, local_only=False, mcast_pkt=True
147         )
148
149         payload = self.decapsulate(pkt)
150         self.assert_eq_pkts(payload, self.frame_reply)
151
152     def test_mcast_rcv(self):
153         """Multicast receive test
154         Send 20 encapsulated frames from pg0 only 10 match unicast tunnels
155         Verify receipt of 10 decap frames on pg2
156         """
157         mac = self.pg0.remote_mac
158         ip_range_start = 10
159         ip_range_end = 30
160         mcast_stream = [
161             self.encap_mcast(self.frame_request, ip, mac, self.mcast_flood_bd)
162             for ip in self.ip_range(ip_range_start, ip_range_end)
163         ]
164         self.pg0.add_stream(mcast_stream)
165         self.pg2.enable_capture()
166         self.pg_start()
167         out = self.pg2.get_capture(10)
168         for pkt in out:
169             self.assert_eq_pkts(pkt, self.frame_request)