test: new test infrastructure
[vpp.git] / test / template_bd.py
1 #!/usr/bin/env python
2
3 from abc import abstractmethod
4
5 from scapy.layers.l2 import Ether, Raw
6 from scapy.layers.inet import IP, UDP
7
8
9 class BridgeDomain(object):
10     def __init__(self):
11         ## Ethernet frame which is send to pg0 interface and is forwarded to pg1
12         self.payload_0_1 = (
13             Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') /
14             IP(src='1.2.3.4', dst='4.3.2.1') /
15             UDP(sport=10000, dport=20000) /
16             Raw('\xa5' * 100))
17
18         ## Ethernet frame which is send to pg1 interface and is forwarded to pg0
19         self.payload_1_0 = (
20             Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
21             IP(src='4.3.2.1', dst='1.2.3.4') /
22             UDP(sport=20000, dport=10000) /
23             Raw('\xa5' * 100))
24
25     ## Test case must implement this method, so template known how to send
26     #  encapsulated frame.
27     @abstractmethod
28     def encapsulate(self, pkt):
29         pass
30
31     ## Test case must implement this method, so template known how to get
32     #  original payload.
33     @abstractmethod
34     def decapsulate(self, pkt):
35         pass
36
37     ## Test case must implement this method, so template known how if the
38     #  received frame is corectly encapsulated.
39     @abstractmethod
40     def check_encapsulation(self, pkt):
41         pass
42
43     ## On pg0 interface are encapsulated frames, on pg1 are testing frames
44     #  without encapsulation
45     def test_decap(self):
46         ## Prepare Ethernet frame that will be send encapsulated.
47         pkt_to_send = self.encapsulate(self.payload_0_1)
48
49         ## Add packet to list of packets.
50         self.pg_add_stream(0, [pkt_to_send, ])
51
52         ## Enable Packet Capture on both ports.
53         self.pg_enable_capture([0, 1])
54
55         ## Start all streams
56         self.pg_start()
57
58         ## Pick first received frame and check if is same as non-encapsulated
59         #  frame.
60         out = self.pg_get_capture(1)
61         self.assertEqual(len(out), 1,
62                          'Invalid number of packets on '
63                          'output: {}'.format(len(out)))
64         pkt = out[0]
65
66         # TODO: add error messages
67         self.assertEqual(pkt[Ether].src, self.payload_0_1[Ether].src)
68         self.assertEqual(pkt[Ether].dst, self.payload_0_1[Ether].dst)
69         self.assertEqual(pkt[IP].src, self.payload_0_1[IP].src)
70         self.assertEqual(pkt[IP].dst, self.payload_0_1[IP].dst)
71         self.assertEqual(pkt[UDP].sport, self.payload_0_1[UDP].sport)
72         self.assertEqual(pkt[UDP].dport, self.payload_0_1[UDP].dport)
73         self.assertEqual(pkt[Raw], self.payload_0_1[Raw])
74
75     ## Send non-encapsulated Ethernet frame from pg1 interface and expect
76     #  encapsulated frame on pg0. On pg0 interface are encapsulated frames,
77     #  on pg1 are testing frames without encapsulation.
78     def test_encap(self):
79         ## Create packet generator stream.
80         self.pg_add_stream(1, [self.payload_1_0])
81
82         ## Enable Packet Capture on both ports.
83         self.pg_enable_capture([0, 1])
84
85         ## Start all streams.
86         self.pg_start()
87
88         ## Pick first received frame and check if is corectly encapsulated.
89         out = self.pg_get_capture(0)
90         self.assertEqual(len(out), 1,
91                          'Invalid number of packets on '
92                          'output: {}'.format(len(out)))
93         rcvd = out[0]
94         self.check_encapsulation(rcvd)
95
96         ## Get original frame from received packet and check if is same as
97         #  sended frame.
98         rcvd_payload = self.decapsulate(rcvd)
99         # TODO: add error messages
100         self.assertEqual(rcvd_payload[Ether].src, self.payload_1_0[Ether].src)
101         self.assertEqual(rcvd_payload[Ether].dst, self.payload_1_0[Ether].dst)
102         self.assertEqual(rcvd_payload[IP].src, self.payload_1_0[IP].src)
103         self.assertEqual(rcvd_payload[IP].dst, self.payload_1_0[IP].dst)
104         self.assertEqual(rcvd_payload[UDP].sport, self.payload_1_0[UDP].sport)
105         self.assertEqual(rcvd_payload[UDP].dport, self.payload_1_0[UDP].dport)
106         self.assertEqual(rcvd_payload[Raw], self.payload_1_0[Raw])