refactor test framework
[vpp.git] / test / template_bd.py
1 #!/usr/bin/env python
2
3 from abc import abstractmethod
4
5 from scapy.layers.l2 import Ether, Raw
6 from scapy.layers.inet import IP, UDP
7
8
9 class BridgeDomain(object):
10     """ Bridge domain abstraction """
11
12     @property
13     def frame_pg0_to_pg1(self):
14         """ Ethernet frame sent from pg0 and expected to arrive at pg1 """
15         return (Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') /
16                 IP(src='1.2.3.4', dst='4.3.2.1') /
17                 UDP(sport=10000, dport=20000) /
18                 Raw('\xa5' * 100))
19
20     @property
21     def frame_pg1_to_pg0(self):
22         """ Ethernet frame sent from pg1 and expected to arrive at pg0 """
23         return (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
24                 IP(src='4.3.2.1', dst='1.2.3.4') /
25                 UDP(sport=20000, dport=10000) /
26                 Raw('\xa5' * 100))
27
28     @abstractmethod
29     def encapsulate(self, pkt):
30         """ Encapsulate packet """
31         pass
32
33     @abstractmethod
34     def decapsulate(self, pkt):
35         """ Decapsulate packet """
36         pass
37
38     @abstractmethod
39     def check_encapsulation(self, pkt):
40         """ Verify the encapsulation """
41         pass
42
43     def test_decap(self):
44         """ Decapsulation test
45         Send encapsulated frames from pg0
46         Verify receipt of decapsulated frames on pg1
47         """
48
49         encapsulated_pkt = self.encapsulate(self.frame_pg0_to_pg1)
50
51         self.pg0.add_stream([encapsulated_pkt, ])
52
53         self.pg1.enable_capture()
54
55         self.pg_start()
56
57         # Pick first received frame and check if it's the non-encapsulated frame
58         out = self.pg1.get_capture()
59         self.assertEqual(len(out), 1,
60                          'Invalid number of packets on '
61                          'output: {}'.format(len(out)))
62         pkt = out[0]
63
64         # TODO: add error messages
65         self.assertEqual(pkt[Ether].src, self.frame_pg0_to_pg1[Ether].src)
66         self.assertEqual(pkt[Ether].dst, self.frame_pg0_to_pg1[Ether].dst)
67         self.assertEqual(pkt[IP].src, self.frame_pg0_to_pg1[IP].src)
68         self.assertEqual(pkt[IP].dst, self.frame_pg0_to_pg1[IP].dst)
69         self.assertEqual(pkt[UDP].sport, self.frame_pg0_to_pg1[UDP].sport)
70         self.assertEqual(pkt[UDP].dport, self.frame_pg0_to_pg1[UDP].dport)
71         self.assertEqual(pkt[Raw], self.frame_pg0_to_pg1[Raw])
72
73     def test_encap(self):
74         """ Encapsulation test
75         Send frames from pg1
76         Verify receipt of encapsulated frames on pg0
77         """
78         self.pg1.add_stream([self.frame_pg1_to_pg0])
79
80         self.pg0.enable_capture()
81
82         self.pg_start()
83
84         # Pick first received frame and check if it's corectly encapsulated.
85         out = self.pg0.get_capture()
86         self.assertEqual(len(out), 1,
87                          'Invalid number of packets on '
88                          'output: {}'.format(len(out)))
89         pkt = out[0]
90         self.check_encapsulation(pkt)
91
92         payload = self.decapsulate(pkt)
93         # TODO: add error messages
94         self.assertEqual(payload[Ether].src, self.frame_pg1_to_pg0[Ether].src)
95         self.assertEqual(payload[Ether].dst, self.frame_pg1_to_pg0[Ether].dst)
96         self.assertEqual(payload[IP].src, self.frame_pg1_to_pg0[IP].src)
97         self.assertEqual(payload[IP].dst, self.frame_pg1_to_pg0[IP].dst)
98         self.assertEqual(payload[UDP].sport, self.frame_pg1_to_pg0[UDP].sport)
99         self.assertEqual(payload[UDP].dport, self.frame_pg1_to_pg0[UDP].dport)
100         self.assertEqual(payload[Raw], self.frame_pg1_to_pg0[Raw])