lb: remove api boilerplate
[vpp.git] / test / template_bd.py
1 #!/usr/bin/env python
2
3 import abc
4 import six
5
6 from scapy.layers.l2 import Ether, Raw
7 from scapy.layers.inet import IP, UDP
8
9 from util import ip4_range
10
11
12 @six.add_metaclass(abc.ABCMeta)
13 class BridgeDomain(object):
14     """ Bridge domain abstraction """
15
16     @property
17     def frame_request(self):
18         """ Ethernet frame modeling a generic request """
19         return (Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') /
20                 IP(src='1.2.3.4', dst='4.3.2.1') /
21                 UDP(sport=10000, dport=20000) /
22                 Raw('\xa5' * 100))
23
24     @property
25     def frame_reply(self):
26         """ Ethernet frame modeling a generic reply """
27         return (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
28                 IP(src='4.3.2.1', dst='1.2.3.4') /
29                 UDP(sport=20000, dport=10000) /
30                 Raw('\xa5' * 100))
31
32     @abc.abstractmethod
33     def ip_range(self, start, end):
34         """ range of remote ip's """
35         pass
36
37     @abc.abstractmethod
38     def encap_mcast(self, pkt, src_ip, src_mac, vni):
39         """ Encapsulate mcast packet """
40         pass
41
42     @abc.abstractmethod
43     def encapsulate(self, pkt, vni):
44         """ Encapsulate packet """
45         pass
46
47     @abc.abstractmethod
48     def decapsulate(self, pkt):
49         """ Decapsulate packet """
50         pass
51
52     @abc.abstractmethod
53     def check_encapsulation(self, pkt, vni, local_only=False):
54         """ Verify the encapsulation """
55         pass
56
57     def assert_eq_pkts(self, pkt1, pkt2):
58         """ Verify the Ether, IP, UDP, payload are equal in both
59         packets
60         """
61         self.assertEqual(pkt1[Ether].src, pkt2[Ether].src)
62         self.assertEqual(pkt1[Ether].dst, pkt2[Ether].dst)
63         self.assertEqual(pkt1[IP].src, pkt2[IP].src)
64         self.assertEqual(pkt1[IP].dst, pkt2[IP].dst)
65         self.assertEqual(pkt1[UDP].sport, pkt2[UDP].sport)
66         self.assertEqual(pkt1[UDP].dport, pkt2[UDP].dport)
67         self.assertEqual(pkt1[Raw], pkt2[Raw])
68
69     def test_decap(self):
70         """ Decapsulation test
71         Send encapsulated frames from pg0
72         Verify receipt of decapsulated frames on pg1
73         """
74
75         encapsulated_pkt = self.encapsulate(self.frame_request,
76                                             self.single_tunnel_bd)
77
78         self.pg0.add_stream([encapsulated_pkt, ])
79
80         self.pg1.enable_capture()
81
82         self.pg_start()
83
84         # Pick first received frame and check if it's the non-encapsulated
85         # frame
86         out = self.pg1.get_capture(1)
87         pkt = out[0]
88         self.assert_eq_pkts(pkt, self.frame_request)
89
90     def test_encap(self):
91         """ Encapsulation test
92         Send frames from pg1
93         Verify receipt of encapsulated frames on pg0
94         """
95         self.pg1.add_stream([self.frame_reply])
96
97         self.pg0.enable_capture()
98
99         self.pg_start()
100
101         # Pick first received frame and check if it's correctly encapsulated.
102         out = self.pg0.get_capture(1)
103         pkt = out[0]
104         self.check_encapsulation(pkt, self.single_tunnel_bd)
105
106         payload = self.decapsulate(pkt)
107         self.assert_eq_pkts(payload, self.frame_reply)
108
109     def test_ucast_flood(self):
110         """ Unicast flood test
111         Send frames from pg3
112         Verify receipt of encapsulated frames on pg0
113         """
114         self.pg3.add_stream([self.frame_reply])
115
116         self.pg0.enable_capture()
117
118         self.pg_start()
119
120         # Get packet from each tunnel and assert it's correctly encapsulated.
121         out = self.pg0.get_capture(self.n_ucast_tunnels)
122         for pkt in out:
123             self.check_encapsulation(pkt, self.ucast_flood_bd, True)
124             payload = self.decapsulate(pkt)
125             self.assert_eq_pkts(payload, self.frame_reply)
126
127     def test_mcast_flood(self):
128         """ Multicast flood test
129         Send frames from pg2
130         Verify receipt of encapsulated frames on pg0
131         """
132         self.pg2.add_stream([self.frame_reply])
133
134         self.pg0.enable_capture()
135
136         self.pg_start()
137
138         # Pick first received frame and check if it's correctly encapsulated.
139         out = self.pg0.get_capture(1)
140         pkt = out[0]
141         self.check_encapsulation(pkt, self.mcast_flood_bd,
142                                  local_only=False, mcast_pkt=True)
143
144         payload = self.decapsulate(pkt)
145         self.assert_eq_pkts(payload, self.frame_reply)
146
147     def test_mcast_rcv(self):
148         """ Multicast receive test
149         Send 20 encapsulated frames from pg0 only 10 match unicast tunnels
150         Verify receipt of 10 decap frames on pg2
151         """
152         mac = self.pg0.remote_mac
153         ip_range_start = 10
154         ip_range_end = 30
155         mcast_stream = [
156             self.encap_mcast(self.frame_request, ip, mac, self.mcast_flood_bd)
157             for ip in self.ip_range(ip_range_start, ip_range_end)]
158         self.pg0.add_stream(mcast_stream)
159         self.pg2.enable_capture()
160         self.pg_start()
161         out = self.pg2.get_capture(10)
162         for pkt in out:
163             self.assert_eq_pkts(pkt, self.frame_request)