xref: /freebsd-14.2/tests/atf_python/sys/net/vnet.py (revision 09d61b28)
1#!/usr/local/bin/python3
2import copy
3import ipaddress
4import os
5import re
6import socket
7import sys
8import time
9from multiprocessing import connection
10from multiprocessing import Pipe
11from multiprocessing import Process
12from typing import Dict
13from typing import List
14from typing import NamedTuple
15
16from atf_python.sys.net.tools import ToolsHelper
17from atf_python.utils import BaseTest
18from atf_python.utils import libc
19
20
21def run_cmd(cmd: str, verbose=True) -> str:
22    print("run: '{}'".format(cmd))
23    return os.popen(cmd).read()
24
25
26def get_topology_id(test_id: str) -> str:
27    """
28    Gets a unique topology id based on the pytest test_id.
29      "test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif]" ->
30      "TestIP6Output:test_output6_pktinfo[ipandif]"
31    """
32    return ":".join(test_id.split("::")[-2:])
33
34
35def convert_test_name(test_name: str) -> str:
36    """Convert test name to a string that can be used in the file/jail names"""
37    ret = ""
38    for char in test_name:
39        if char.isalnum() or char in ("_", "-", ":"):
40            ret += char
41        elif char in ("["):
42            ret += "_"
43    return ret
44
45
46class VnetInterface(object):
47    # defines from net/if_types.h
48    IFT_LOOP = 0x18
49    IFT_ETHER = 0x06
50
51    def __init__(self, iface_alias: str, iface_name: str):
52        self.name = iface_name
53        self.alias = iface_alias
54        self.vnet_name = ""
55        self.jailed = False
56        self.addr_map: Dict[str, Dict] = {"inet6": {}, "inet": {}}
57        self.prefixes4: List[List[str]] = []
58        self.prefixes6: List[List[str]] = []
59        if iface_name.startswith("lo"):
60            self.iftype = self.IFT_LOOP
61        else:
62            self.iftype = self.IFT_ETHER
63
64    @property
65    def ifindex(self):
66        return socket.if_nametoindex(self.name)
67
68    @property
69    def first_ipv6(self):
70        d = self.addr_map["inet6"]
71        return d[next(iter(d))]
72
73    @property
74    def first_ipv4(self):
75        d = self.addr_map["inet"]
76        return d[next(iter(d))]
77
78    def set_vnet(self, vnet_name: str):
79        self.vnet_name = vnet_name
80
81    def set_jailed(self, jailed: bool):
82        self.jailed = jailed
83
84    def run_cmd(
85        self,
86        cmd,
87        verbose=False,
88    ):
89        if self.vnet_name and not self.jailed:
90            cmd = "jexec {} {}".format(self.vnet_name, cmd)
91        return run_cmd(cmd, verbose)
92
93    @classmethod
94    def setup_loopback(cls, vnet_name: str):
95        lo = VnetInterface("", "lo0")
96        lo.set_vnet(vnet_name)
97        lo.setup_addr("127.0.0.1/8")
98        lo.turn_up()
99
100    @classmethod
101    def create_iface(cls, alias_name: str, iface_name: str) -> List["VnetInterface"]:
102        name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip()
103        if not name:
104            raise Exception("Unable to create iface {}".format(iface_name))
105        ret = [cls(alias_name, name)]
106        if name.startswith("epair"):
107            ret.append(cls(alias_name, name[:-1] + "b"))
108        return ret
109
110    def setup_addr(self, _addr: str):
111        addr = ipaddress.ip_interface(_addr)
112        if addr.version == 6:
113            family = "inet6"
114            cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr)
115        else:
116            family = "inet"
117            if self.addr_map[family]:
118                cmd = "/sbin/ifconfig {} alias {}".format(self.name, addr)
119            else:
120                cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr)
121        self.run_cmd(cmd)
122        self.addr_map[family][str(addr.ip)] = addr
123
124    def delete_addr(self, _addr: str):
125        addr = ipaddress.ip_address(_addr)
126        if addr.version == 6:
127            family = "inet6"
128            cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr)
129        else:
130            family = "inet"
131            cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr)
132        self.run_cmd(cmd)
133        del self.addr_map[family][str(addr)]
134
135    def turn_up(self):
136        cmd = "/sbin/ifconfig {} up".format(self.name)
137        self.run_cmd(cmd)
138
139    def enable_ipv6(self):
140        cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name)
141        self.run_cmd(cmd)
142
143    def has_tentative(self) -> bool:
144        """True if an interface has some addresses in tenative state"""
145        cmd = "/sbin/ifconfig {} inet6".format(self.name)
146        out = self.run_cmd(cmd, verbose=False)
147        for line in out.splitlines():
148            if "tentative" in line:
149                return True
150        return False
151
152
153class IfaceFactory(object):
154    INTERFACES_FNAME = "created_ifaces.lst"
155    AUTODELETE_TYPES = ("epair", "gif", "gre", "lo", "tap", "tun")
156
157    def __init__(self):
158        self.file_name = self.INTERFACES_FNAME
159
160    def _register_iface(self, iface_name: str):
161        with open(self.file_name, "a") as f:
162            f.write(iface_name + "\n")
163
164    def _list_ifaces(self) -> List[str]:
165        ret: List[str] = []
166        try:
167            with open(self.file_name, "r") as f:
168                for line in f:
169                    ret.append(line.strip())
170        except OSError:
171            pass
172        return ret
173
174    def create_iface(self, alias_name: str, iface_name: str) -> List[VnetInterface]:
175        ifaces = VnetInterface.create_iface(alias_name, iface_name)
176        for iface in ifaces:
177            if not self.is_autodeleted(iface.name):
178                self._register_iface(iface.name)
179        return ifaces
180
181    @staticmethod
182    def is_autodeleted(iface_name: str) -> bool:
183        iface_type = re.split(r"\d+", iface_name)[0]
184        return iface_type in IfaceFactory.AUTODELETE_TYPES
185
186    def cleanup_vnet_interfaces(self, vnet_name: str) -> List[str]:
187        """Destroys"""
188        ifaces_lst = ToolsHelper.get_output(
189            "/usr/sbin/jexec {} ifconfig -l".format(vnet_name)
190        )
191        for iface_name in ifaces_lst.split():
192            if not self.is_autodeleted(iface_name):
193                if iface_name not in self._list_ifaces():
194                    print("Skipping interface {}:{}".format(vnet_name, iface_name))
195                    continue
196            run_cmd(
197                "/usr/sbin/jexec {} ifconfig {} destroy".format(vnet_name, iface_name)
198            )
199
200    def cleanup(self):
201        try:
202            os.unlink(self.INTERFACES_FNAME)
203        except OSError:
204            pass
205
206
207class VnetInstance(object):
208    def __init__(
209        self, vnet_alias: str, vnet_name: str, jid: int, ifaces: List[VnetInterface]
210    ):
211        self.name = vnet_name
212        self.alias = vnet_alias  # reference in the test topology
213        self.jid = jid
214        self.ifaces = ifaces
215        self.iface_alias_map = {}  # iface.alias: iface
216        self.iface_map = {}  # iface.name: iface
217        for iface in ifaces:
218            iface.set_vnet(vnet_name)
219            iface.set_jailed(True)
220            self.iface_alias_map[iface.alias] = iface
221            self.iface_map[iface.name] = iface
222            # Allow reference to interfce aliases as attributes
223            setattr(self, iface.alias, iface)
224        self.need_dad = False  # Disable duplicate address detection by default
225        self.attached = False
226        self.pipe = None
227        self.subprocess = None
228
229    def run_vnet_cmd(self, cmd):
230        if not self.attached:
231            cmd = "jexec {} {}".format(self.name, cmd)
232        return run_cmd(cmd)
233
234    def disable_dad(self):
235        self.run_vnet_cmd("/sbin/sysctl net.inet6.ip6.dad_count=0")
236
237    def set_pipe(self, pipe):
238        self.pipe = pipe
239
240    def set_subprocess(self, p):
241        self.subprocess = p
242
243    @staticmethod
244    def attach_jid(jid: int):
245        error_code = libc.jail_attach(jid)
246        if error_code != 0:
247            raise Exception("jail_attach() failed: errno {}".format(error_code))
248
249    def attach(self):
250        self.attach_jid(self.jid)
251        self.attached = True
252
253
254class VnetFactory(object):
255    JAILS_FNAME = "created_jails.lst"
256
257    def __init__(self, topology_id: str):
258        self.topology_id = topology_id
259        self.file_name = self.JAILS_FNAME
260        self._vnets: List[str] = []
261
262    def _register_vnet(self, vnet_name: str):
263        self._vnets.append(vnet_name)
264        with open(self.file_name, "a") as f:
265            f.write(vnet_name + "\n")
266
267    @staticmethod
268    def _wait_interfaces(vnet_name: str, ifaces: List[str]) -> List[str]:
269        cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name)
270        not_matched: List[str] = []
271        for i in range(50):
272            vnet_ifaces = run_cmd(cmd).strip().split(" ")
273            not_matched = []
274            for iface_name in ifaces:
275                if iface_name not in vnet_ifaces:
276                    not_matched.append(iface_name)
277            if len(not_matched) == 0:
278                return []
279            time.sleep(0.1)
280        return not_matched
281
282    def create_vnet(self, vnet_alias: str, ifaces: List[VnetInterface]):
283        vnet_name = "pytest:{}".format(convert_test_name(self.topology_id))
284        if self._vnets:
285            # add number to distinguish jails
286            vnet_name = "{}_{}".format(vnet_name, len(self._vnets) + 1)
287        iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces])
288        cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format(
289            vnet_name, iface_cmds
290        )
291        jid = 0
292        try:
293            jid_str = run_cmd(cmd)
294            jid = int(jid_str)
295        except ValueError:
296            print("Jail creation failed, output: {}".format(jid_str))
297            raise
298        self._register_vnet(vnet_name)
299
300        # Run expedited version of routing
301        VnetInterface.setup_loopback(vnet_name)
302
303        not_found = self._wait_interfaces(vnet_name, [i.name for i in ifaces])
304        if not_found:
305            raise Exception(
306                "Interfaces {} has not appeared in vnet {}".format(not_found, vnet_name)
307            )
308        return VnetInstance(vnet_alias, vnet_name, jid, ifaces)
309
310    def cleanup(self):
311        iface_factory = IfaceFactory()
312        try:
313            with open(self.file_name) as f:
314                for line in f:
315                    vnet_name = line.strip()
316                    iface_factory.cleanup_vnet_interfaces(vnet_name)
317                    run_cmd("/usr/sbin/jail -r  {}".format(vnet_name))
318            os.unlink(self.JAILS_FNAME)
319        except OSError:
320            pass
321
322
323class SingleInterfaceMap(NamedTuple):
324    ifaces: List[VnetInterface]
325    vnet_aliases: List[str]
326
327
328class ObjectsMap(NamedTuple):
329    iface_map: Dict[str, SingleInterfaceMap]  # keyed by ifX
330    vnet_map: Dict[str, VnetInstance]  # keyed by vnetX
331    topo_map: Dict  # self.TOPOLOGY
332
333
334class VnetTestTemplate(BaseTest):
335    NEED_ROOT: bool = True
336    TOPOLOGY = {}
337
338    def _require_default_modules(self):
339        libc.kldload("if_epair.ko")
340        self.require_module("if_epair")
341
342    def _get_vnet_handler(self, vnet_alias: str):
343        handler_name = "{}_handler".format(vnet_alias)
344        return getattr(self, handler_name, None)
345
346    def _setup_vnet(self, vnet: VnetInstance, obj_map: Dict, pipe):
347        """Base Handler to setup given VNET.
348        Can be run in a subprocess. If so, passes control to the special
349        vnetX_handler() after setting up interface addresses
350        """
351        vnet.attach()
352        print("# setup_vnet({})".format(vnet.name))
353        if pipe is not None:
354            vnet.set_pipe(pipe)
355
356        topo = obj_map.topo_map
357        ipv6_ifaces = []
358        # Disable DAD
359        if not vnet.need_dad:
360            vnet.disable_dad()
361        for iface in vnet.ifaces:
362            # check index of vnet within an interface
363            # as we have prefixes for both ends of the interface
364            iface_map = obj_map.iface_map[iface.alias]
365            idx = iface_map.vnet_aliases.index(vnet.alias)
366            prefixes6 = topo[iface.alias].get("prefixes6", [])
367            prefixes4 = topo[iface.alias].get("prefixes4", [])
368            if prefixes6 or prefixes4:
369                ipv6_ifaces.append(iface)
370                iface.turn_up()
371                if prefixes6:
372                    iface.enable_ipv6()
373            for prefix in prefixes6 + prefixes4:
374                if prefix[idx]:
375                    iface.setup_addr(prefix[idx])
376        for iface in ipv6_ifaces:
377            while iface.has_tentative():
378                time.sleep(0.1)
379
380        # Run actual handler
381        handler = self._get_vnet_handler(vnet.alias)
382        if handler:
383            # Do unbuffered stdout for children
384            # so the logs are present if the child hangs
385            sys.stdout.reconfigure(line_buffering=True)
386            self.drop_privileges()
387            handler(vnet)
388
389    def _get_topo_ifmap(self, topo: Dict):
390        iface_factory = IfaceFactory()
391        iface_map: Dict[str, SingleInterfaceMap] = {}
392        iface_aliases = set()
393        for obj_name, obj_data in topo.items():
394            if obj_name.startswith("vnet"):
395                for iface_alias in obj_data["ifaces"]:
396                    iface_aliases.add(iface_alias)
397        for iface_alias in iface_aliases:
398            print("Creating {}".format(iface_alias))
399            iface_data = topo[iface_alias]
400            iface_type = iface_data.get("type", "epair")
401            ifaces = iface_factory.create_iface(iface_alias, iface_type)
402            smap = SingleInterfaceMap(ifaces, [])
403            iface_map[iface_alias] = smap
404        return iface_map
405
406    def setup_topology(self, topo: Dict, topology_id: str):
407        """Creates jails & interfaces for the provided topology"""
408        vnet_map = {}
409        vnet_factory = VnetFactory(topology_id)
410        iface_map = self._get_topo_ifmap(topo)
411        for obj_name, obj_data in topo.items():
412            if obj_name.startswith("vnet"):
413                vnet_ifaces = []
414                for iface_alias in obj_data["ifaces"]:
415                    # epair creates 2 interfaces, grab first _available_
416                    # and map it to the VNET being created
417                    idx = len(iface_map[iface_alias].vnet_aliases)
418                    iface_map[iface_alias].vnet_aliases.append(obj_name)
419                    vnet_ifaces.append(iface_map[iface_alias].ifaces[idx])
420                vnet = vnet_factory.create_vnet(obj_name, vnet_ifaces)
421                vnet_map[obj_name] = vnet
422                # Allow reference to VNETs as attributes
423                setattr(self, obj_name, vnet)
424        # Debug output
425        print("============= TEST TOPOLOGY =============")
426        for vnet_alias, vnet in vnet_map.items():
427            print("# vnet {} -> {}".format(vnet.alias, vnet.name), end="")
428            handler = self._get_vnet_handler(vnet.alias)
429            if handler:
430                print(" handler: {}".format(handler.__name__), end="")
431            print()
432        for iface_alias, iface_data in iface_map.items():
433            vnets = iface_data.vnet_aliases
434            ifaces: List[VnetInterface] = iface_data.ifaces
435            if len(vnets) == 1 and len(ifaces) == 2:
436                print(
437                    "# iface {}: {}::{} -> main::{}".format(
438                        iface_alias, vnets[0], ifaces[0].name, ifaces[1].name
439                    )
440                )
441            elif len(vnets) == 2 and len(ifaces) == 2:
442                print(
443                    "# iface {}: {}::{} -> {}::{}".format(
444                        iface_alias, vnets[0], ifaces[0].name, vnets[1], ifaces[1].name
445                    )
446                )
447            else:
448                print(
449                    "# iface {}: ifaces: {} vnets: {}".format(
450                        iface_alias, vnets, [i.name for i in ifaces]
451                    )
452                )
453        print()
454        return ObjectsMap(iface_map, vnet_map, topo)
455
456    def setup_method(self, _method):
457        """Sets up all the required topology and handlers for the given test"""
458        super().setup_method(_method)
459        self._require_default_modules()
460
461        # TestIP6Output.test_output6_pktinfo[ipandif]
462        topology_id = get_topology_id(self.test_id)
463        topology = self.TOPOLOGY
464        # First, setup kernel objects - interfaces & vnets
465        obj_map = self.setup_topology(topology, topology_id)
466        main_vnet = None  # one without subprocess handler
467        for vnet_alias, vnet in obj_map.vnet_map.items():
468            if self._get_vnet_handler(vnet_alias):
469                # Need subprocess to run
470                parent_pipe, child_pipe = Pipe()
471                p = Process(
472                    target=self._setup_vnet,
473                    args=(
474                        vnet,
475                        obj_map,
476                        child_pipe,
477                    ),
478                )
479                vnet.set_pipe(parent_pipe)
480                vnet.set_subprocess(p)
481                p.start()
482            else:
483                if main_vnet is not None:
484                    raise Exception("there can be only 1 VNET w/o handler")
485                main_vnet = vnet
486        # Main vnet needs to be the last, so all the other subprocesses
487        # are started & their pipe handles collected
488        self.vnet = main_vnet
489        self._setup_vnet(main_vnet, obj_map, None)
490        # Save state for the main handler
491        self.iface_map = obj_map.iface_map
492        self.vnet_map = obj_map.vnet_map
493        self.drop_privileges()
494
495    def cleanup(self, test_id: str):
496        # pytest test id: file::class::test_name
497        topology_id = get_topology_id(self.test_id)
498
499        print("==== vnet cleanup ===")
500        print("# topology_id: '{}'".format(topology_id))
501        VnetFactory(topology_id).cleanup()
502        IfaceFactory().cleanup()
503
504    def wait_object(self, pipe, timeout=5):
505        if pipe.poll(timeout):
506            return pipe.recv()
507        raise TimeoutError
508
509    def wait_objects_any(self, pipe_list, timeout=5):
510        objects = connection.wait(pipe_list, timeout)
511        if objects:
512            return objects[0].recv()
513        raise TimeoutError
514
515    def send_object(self, pipe, obj):
516        pipe.send(obj)
517
518    def wait(self):
519        while True:
520            time.sleep(1)
521
522    @property
523    def curvnet(self):
524        pass
525
526
527class SingleVnetTestTemplate(VnetTestTemplate):
528    IPV6_PREFIXES: List[str] = []
529    IPV4_PREFIXES: List[str] = []
530    IFTYPE = "epair"
531
532    def _setup_default_topology(self):
533        topology = copy.deepcopy(
534            {
535                "vnet1": {"ifaces": ["if1"]},
536                "if1": {"type": self.IFTYPE, "prefixes4": [], "prefixes6": []},
537            }
538        )
539        for prefix in self.IPV6_PREFIXES:
540            topology["if1"]["prefixes6"].append((prefix,))
541        for prefix in self.IPV4_PREFIXES:
542            topology["if1"]["prefixes4"].append((prefix,))
543        return topology
544
545    def setup_method(self, method):
546        if not getattr(self, "TOPOLOGY", None):
547            self.TOPOLOGY = self._setup_default_topology()
548        else:
549            names = self.TOPOLOGY.keys()
550            assert len([n for n in names if n.startswith("vnet")]) == 1
551        super().setup_method(method)
552