Code Review
/
vpp.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
tests: replace pycodestyle with black
[vpp.git]
/
src
/
plugins
/
map
/
examples
/
test_map.py
diff --git
a/src/plugins/map/examples/test_map.py
b/src/plugins/map/examples/test_map.py
index
7a48964
..
02df640
100755
(executable)
--- a/
src/plugins/map/examples/test_map.py
+++ b/
src/plugins/map/examples/test_map.py
@@
-1,10
+1,10
@@
#!/usr/bin/env python3
#!/usr/bin/env python3
-import time,
argparse,sys,
cmd, unittest
+import time,
argparse, sys,
cmd, unittest
from ipaddress import *
from ipaddress import *
-parser = argparse.ArgumentParser(description=
'VPP MAP test'
)
-parser.add_argument(
'-i', nargs='*'
, action="store", dest="inputdir")
+parser = argparse.ArgumentParser(description=
"VPP MAP test"
)
+parser.add_argument(
"-i", nargs="*"
, action="store", dest="inputdir")
args = parser.parse_args()
for dir in args.inputdir:
args = parser.parse_args()
for dir in args.inputdir:
@@
-14,115
+14,150
@@
from vpp_papi import *
#
# 1:1 Shared IPv4 address, shared BR (16) VPP CLI
#
#
# 1:1 Shared IPv4 address, shared BR (16) VPP CLI
#
-def lw46_shared(ip4_pfx_str, ip6_pfx_str, ip6_src_str, ea_bits_len, psid_offset, psid_len, ip6_src_ecmp = False):
+def lw46_shared(
+ ip4_pfx_str,
+ ip6_pfx_str,
+ ip6_src_str,
+ ea_bits_len,
+ psid_offset,
+ psid_len,
+ ip6_src_ecmp=False,
+):
ip4_pfx = ip_network(ip4_pfx_str)
ip6_src = ip_address(ip6_src_str)
ip6_dst = ip_network(ip6_pfx_str)
ip4_pfx = ip_network(ip4_pfx_str)
ip6_src = ip_address(ip6_src_str)
ip6_dst = ip_network(ip6_pfx_str)
- ip6_nul = IPv6Address(
u'0::0'
)
+ ip6_nul = IPv6Address(
"0::0"
)
mod = ip4_pfx.num_addresses / 1024
for i in range(ip4_pfx.num_addresses):
a = time.clock()
mod = ip4_pfx.num_addresses / 1024
for i in range(ip4_pfx.num_addresses):
a = time.clock()
- t = map_add_domain(0, ip6_nul.packed, ip4_pfx[i].packed, ip6_src.packed, 0, 32, 128, ea_bits_len, psid_offset, psid_len, 0, 0)
- #print "Return from map_add_domain", t
+ t = map_add_domain(
+ 0,
+ ip6_nul.packed,
+ ip4_pfx[i].packed,
+ ip6_src.packed,
+ 0,
+ 32,
+ 128,
+ ea_bits_len,
+ psid_offset,
+ psid_len,
+ 0,
+ 0,
+ )
+ # print "Return from map_add_domain", t
if t == None:
if t == None:
- print
"map_add_domain failed"
+ print
("map_add_domain failed")
continue
if t.retval != 0:
continue
if t.retval != 0:
- print
"map_add_domain failed", t
+ print
(f"map_add_domain failed, {t}")
continue
for psid in range(0x1 << int(psid_len)):
continue
for psid in range(0x1 << int(psid_len)):
- r = map_add_del_rule(0, t.index, 1, (ip6_dst[(i * (0x1<<int(psid_len))) + psid]).packed, psid)
- #print "Return from map_add_del_rule", r
+ r = map_add_del_rule(
+ 0,
+ t.index,
+ 1,
+ (ip6_dst[(i * (0x1 << int(psid_len))) + psid]).packed,
+ psid,
+ )
+ # print "Return from map_add_del_rule", r
if ip6_src_ecmp and not i % mod:
ip6_src = ip6_src + 1
if ip6_src_ecmp and not i % mod:
ip6_src = ip6_src + 1
- print "Running time:", time.clock() - a
+ print(f"Running time: {time.clock() - a}")
+
class TestMAP(unittest.TestCase):
class TestMAP(unittest.TestCase):
- '''
+ """
def test_delete_all(self):
t = map_domain_dump(0)
self.assertNotEqual(t, None)
def test_delete_all(self):
t = map_domain_dump(0)
self.assertNotEqual(t, None)
- print
"Number of domains configured: ", len(t
)
+ print
(f"Number of domains configured: {len(t)}"
)
for d in t:
ts = map_del_domain(0, d.domainindex)
self.assertNotEqual(ts, None)
t = map_domain_dump(0)
self.assertNotEqual(t, None)
for d in t:
ts = map_del_domain(0, d.domainindex)
self.assertNotEqual(ts, None)
t = map_domain_dump(0)
self.assertNotEqual(t, None)
- print
"Number of domains configured: ", len(t
)
- self.assertEqual(len(t), 0)
+ print
(f"Number of domains configured: {len(t)}"
)
+ self.assertEqual(len(t), 0)
/
- '''
+ """
def test_a_million_rules(self):
def test_a_million_rules(self):
- ip4_pfx =
u'192.0.2.0/24'
- ip6_pfx =
u'2001:db8::/32'
- ip6_src =
u'2001:db8::1'
+ ip4_pfx =
"192.0.2.0/24"
+ ip6_pfx =
"2001:db8::/32"
+ ip6_src =
"2001:db8::1"
psid_offset = 6
psid_len = 6
ea_bits_len = 0
lw46_shared(ip4_pfx, ip6_pfx, ip6_src, ea_bits_len, psid_offset, psid_len)
psid_offset = 6
psid_len = 6
ea_bits_len = 0
lw46_shared(ip4_pfx, ip6_pfx, ip6_src, ea_bits_len, psid_offset, psid_len)
+
#
# RX thread, that should sit on blocking vpe_api_read()
#
# RX thread, that should sit on blocking vpe_api_read()
-#
+#
#
#
#
import threading
#
#
#
import threading
-class RXThread (threading.Thread):
+
+
+class RXThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
- print
"Starting "
+ print
("Starting ")
i = 0
while True:
msg = vpe_api_read()
if msg:
i = 0
while True:
msg = vpe_api_read()
if msg:
- #print msg
- id = unpack(
'>H'
, msg[0:2])
- size = unpack(
'>H'
, msg[2:4])
- print
"Received", id, "of size", size
+ #
print msg
+ id = unpack(
">H"
, msg[0:2])
+ size = unpack(
">H"
, msg[2:4])
+ print
(f"Received {id} of size {size}")
i += 1
i += 1
- #del msg
+ #
del msg
continue
continue
- #time.sleep(0.001)
+ #
time.sleep(0.001)
return
return
+
# Create RX thread
rxthread = RXThread()
rxthread.setDaemon(True)
# Create RX thread
rxthread = RXThread()
rxthread.setDaemon(True)
-
-print
"Connect", connect_to_vpe("client124
")
+
+print
(f"Connect {connect_to_vpe('client124')}
")
import timeit
import timeit
+
rxthread.start()
rxthread.start()
-print
"After thread started"
+print
("After thread started")
-#pneum_kill_thread()
-print
"After thread killed"
+#
pneum_kill_thread()
+print
("After thread killed")
-#t = show_version(0)
-#print "Result from show version", t
+#
t = show_version(0)
+#
print "Result from show version", t
-print timeit.timeit('t = show_version(0)', number=1000, setup="from __main__ import show_version")
+print(
+ f"{timeit.timeit('t = show_version(0)', number=1000, setup='from __main__ import show_version')}"
+)
time.sleep(10)
time.sleep(10)
-#print timeit.timeit('control_ping(0)', number=10, setup="from __main__ import control_ping")
+#
print timeit.timeit('control_ping(0)', number=10, setup="from __main__ import control_ping")
disconnect_from_vpe()
sys.exit()
disconnect_from_vpe()
sys.exit()
-print
t.program, t.version,t.builddate,t.builddirectory
+print
(f"{t.program} {t.version}{t.builddate}{t.builddirectory}")
-'''
+"""
t = map_domain_dump(0)
if not t:
t = map_domain_dump(0)
if not t:
@@
-131,11
+166,9
@@
if not t:
for d in t:
print("IP6 prefix:",str(IPv6Address(d.ip6prefix)))
print( "IP4 prefix:",str(IPv4Address(d.ip4prefix)))
for d in t:
print("IP6 prefix:",str(IPv6Address(d.ip6prefix)))
print( "IP4 prefix:",str(IPv4Address(d.ip4prefix)))
-'''
+"""
suite = unittest.TestLoader().loadTestsFromTestCase(TestMAP)
unittest.TextTestRunner(verbosity=2).run(suite)
disconnect_from_vpe()
suite = unittest.TestLoader().loadTestsFromTestCase(TestMAP)
unittest.TextTestRunner(verbosity=2).run(suite)
disconnect_from_vpe()
-
-