From c79e094f8f198ada621283a7de5687e0c2e3b5fc Mon Sep 17 00:00:00 2001 From: "d.kovalenko" Date: Wed, 27 May 2026 11:17:12 +0300 Subject: [PATCH] New: OsOperations::is_port_available(ip: str, number: int) This method replaces OsOperations::is_port_free(number: int) - It respects IP - It supports IPv6 Tests are added. --- .github/workflows/ci.yml | 2 +- Dockerfile--astralinux_1_7.tmpl | 3 + Dockerfile--ubuntu_24_04.tmpl | 3 + run_tests3.sh | 4 + src/local_ops.py | 47 ++++++++++ src/os_ops.py | 8 ++ src/remote_ops.py | 108 ++++++++++++++++++++++ tests/test_os_ops_common.py | 156 ++++++++++++++++++++++++++++++++ 8 files changed, 330 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 81f4860..85aa409 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -117,7 +117,7 @@ jobs: - name: Build local image ${{ matrix.alpine }} run: docker build --build-arg PYTHON_VERSION="${{ matrix.python }}" -t "${{ env.RUN_CFG__DOCKER_IMAGE_NAME }}" -f Dockerfile--${{ matrix.platform }}.tmpl . - name: Run - run: docker run -t -v ${{ github.workspace }}/${{ env.RUN_CFG__LOGS_DIR }}:/home/test/testgres/logs "${{ env.RUN_CFG__DOCKER_IMAGE_NAME }}" + run: docker run -t --cap-add=NET_ADMIN -v ${{ github.workspace }}/${{ env.RUN_CFG__LOGS_DIR }}:/home/test/testgres/logs "${{ env.RUN_CFG__DOCKER_IMAGE_NAME }}" - name: Upload Logs uses: actions/upload-artifact@v7 if: always() # IT IS IMPORTANT! diff --git a/Dockerfile--astralinux_1_7.tmpl b/Dockerfile--astralinux_1_7.tmpl index b68b2fa..5c866d1 100644 --- a/Dockerfile--astralinux_1_7.tmpl +++ b/Dockerfile--astralinux_1_7.tmpl @@ -12,6 +12,9 @@ RUN apt install -y openssh-server RUN apt install -y git +# this installs "ip" utility +RUN apt install -y iproute2 + # RUN apt install -y mc # RUN apt install -y nano diff --git a/Dockerfile--ubuntu_24_04.tmpl b/Dockerfile--ubuntu_24_04.tmpl index d953a52..84a4938 100644 --- a/Dockerfile--ubuntu_24_04.tmpl +++ b/Dockerfile--ubuntu_24_04.tmpl @@ -12,6 +12,9 @@ RUN apt install -y netcat-traditional RUN apt install -y git +# this installs "ip" utility +RUN apt install -y iproute2 + # RUN apt install -y mc # --------------------------------------------- base2_with_python-3 diff --git a/run_tests3.sh b/run_tests3.sh index 00c7b7b..d281012 100755 --- a/run_tests3.sh +++ b/run_tests3.sh @@ -2,6 +2,10 @@ set -eux +for i in {2..11}; do + sudo ip addr add 127.0.0.$i/32 dev lo +done + # prepare python environment VENV_PATH="/tmp/testgres_venv" rm -rf $VENV_PATH diff --git a/src/local_ops.py b/src/local_ops.py index ebc1591..ea378a7 100644 --- a/src/local_ops.py +++ b/src/local_ops.py @@ -16,6 +16,7 @@ import threading import copy import signal as os_signal +import ipaddress from .exceptions import ExecUtilException from .exceptions import InvalidOperationException @@ -599,6 +600,52 @@ def is_port_free(self, number: int) -> bool: except OSError: return False + def is_port_available(self, ip: str, number: int) -> bool: + assert type(ip) is str + assert ip != "" + assert type(number) is int + assert number >= 0 + assert number <= 65535 # OK? + + try: + addr = ipaddress.ip_address(ip) + if addr.version == 4: + return __class__._is_port_available_vX( + ip, + number, + socket.AF_INET, + ) + if addr.version == 6: + return self._is_port_available_vX( + ip, + number, + socket.AF_INET6, + ) + except ValueError: + raise RuntimeError("Unknown format of IP: {!r}".format(ip)) + + raise RuntimeError("Unsupported IP version: {!r}".format(ip)) + + @staticmethod + def _is_port_available_vX( + ip: str, + number: int, + address_family: socket.AddressFamily, + ) -> bool: + assert type(ip) is str + assert ip != "" + assert type(number) is int + assert number >= 0 + assert number <= 65535 # OK? + + with socket.socket(address_family, socket.SOCK_STREAM) as s: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + s.bind((ip, number)) + return True + except OSError: + return False + def get_tempdir(self) -> str: r = tempfile.gettempdir() assert r is not None diff --git a/src/os_ops.py b/src/os_ops.py index 01ee347..451c4ca 100644 --- a/src/os_ops.py +++ b/src/os_ops.py @@ -148,5 +148,13 @@ def is_port_free(self, number: int): assert type(number) is int raise NotImplementedError() + def is_port_available(self, ip: str, number: int) -> bool: + assert type(ip) is str + assert ip != "" + assert type(number) is int + assert number >= 0 + assert number <= 65535 # OK? + raise NotImplementedError() + def get_tempdir(self) -> str: raise NotImplementedError() diff --git a/src/remote_ops.py b/src/remote_ops.py index 5f7fbce..f9dc1e3 100644 --- a/src/remote_ops.py +++ b/src/remote_ops.py @@ -11,6 +11,8 @@ import copy import re import signal as os_signal +import struct +import ipaddress from .exceptions import ExecUtilException from .exceptions import InvalidOperationException @@ -734,6 +736,112 @@ def is_port_free(self, number: int) -> bool: out=output ) + def is_port_available(self, ip: str, number: int) -> bool: + assert type(ip) is str + assert ip != "" + assert type(number) is int + assert number >= 0 + assert number <= 65535 # OK? + + try: + addr = ipaddress.ip_address(ip) + if addr.version == 4: + return self._is_port_available_v4( + addr, + number, + ) + if addr.version == 6: + return self._is_port_available_v6( + addr, + number, + ) + except ValueError: + raise RuntimeError("Unknown format of IP: {!r}".format(ip)) + + raise RuntimeError("Unsupported IP version: {!r}".format(ip)) + + # -------------------------------------------------------------------- + def _is_port_available_v4(self, addr: ipaddress.IPv4Address, number: int) -> bool: + assert type(addr) is ipaddress.IPv4Address + assert type(number) is int + assert number >= 0 + assert number <= 65535 # OK? + + ip_packed = addr.packed + + # 1. The IP address really depends on the architecture (we only flip it) + ip_le = format(struct.unpack("I", ip_packed)[0], "08X") + ip_be = format(struct.unpack("!I", ip_packed)[0], "08X") + + # 2. The port is ALWAYS output in Big Endian (just convert the number to HEX) + port_hex = format(number, "04X") + + # 3. Build a command + # Byte 0: 0x7F + # Byte 1: 'E' + # Byte 2: 'L' + # Byte 3: 'F' + # Byte 4: Bit class (01 — 32-bit, 02 — 64-bit) + # Byte 5: Byte order (01 — Little Endian, 02 — Big Endian) + grep_cmd_s = ( + 'if od -An -t x1 -N 1 -j 5 /bin/bash | grep -q "01"; then ' + ' if grep -q -E "^\\s*[0-9]+:\\s*' + ip_le + ':' + port_hex + '\\s+" /proc/net/tcp; then echo "BUSY"; else echo "FREE"; fi; ' + 'else ' + ' if grep -q -E "^\\s*[0-9]+:\\s*' + ip_be + ':' + port_hex + '\\s+" /proc/net/tcp; then echo "BUSY"; else echo "FREE"; fi; ' + 'fi' + ) + + return self._run_grep(grep_cmd_s) + + # -------------------------------------------------------------------- + def _is_port_available_v6(self, addr: ipaddress.IPv6Address, number: int) -> bool: + assert type(addr) is ipaddress.IPv6Address + assert type(number) is int + assert number >= 0 + assert number <= 65535 # OK? + + ip_bytes = addr.packed + words = struct.unpack("!IIII", ip_bytes) + + # 1. The IP address really depends on the architecture (we only flip it) + ip_le = "".join(format(struct.unpack("I", w))[0], "08X") for w in words) + ip_be = "".join(format(w, "08X") for w in words) + + # 2. The port is ALWAYS output in Big Endian (just convert the number to HEX) + port_hex = format(number, "04X") + + # 3. Bash script checks architecture: if 5th byte of /bin/bash is 1, then it is Little Endian + grep_cmd_s = ( + 'if od -An -t x1 -N 1 -j 5 /bin/bash | grep -q "01"; then ' + ' if grep -q -E "^\\s*[0-9]+:\\s*' + ip_le + ':' + port_hex + '\\s+" /proc/net/tcp6; then echo "BUSY"; else echo "FREE"; fi; ' + 'else ' + ' if grep -q -E "^\\s*[0-9]+:\\s*' + ip_be + ':' + port_hex + '\\s+" /proc/net/tcp6; then echo "BUSY"; else echo "FREE"; fi; ' + 'fi' + ) + + return self._run_grep(grep_cmd_s) + + # -------------------------------------------------------------------- + def _run_grep(self, grep_cmd_s: str) -> bool: + assert type(grep_cmd_s) is str + + cmd = ["/bin/bash", "-c", grep_cmd_s] + + output = self.exec_command( + cmd=cmd, + encoding=get_default_encoding(), + ) + + if output == "BUSY\n": + return False + + if output == "FREE\n": + return True + + errMsg = "grep returned unexpected output: {!r}".format(output) + raise RuntimeError(errMsg) + + # -------------------------------------------------------------------- def get_tempdir(self) -> str: command = ["mktemp", "-u", "-d"] diff --git a/tests/test_os_ops_common.py b/tests/test_os_ops_common.py index e4b8278..e04987b 100644 --- a/tests/test_os_ops_common.py +++ b/tests/test_os_ops_common.py @@ -19,6 +19,7 @@ import psutil import time import signal as os_signal +import dataclasses from src.exceptions import InvalidOperationException from src.exceptions import ExecUtilException @@ -27,6 +28,33 @@ from concurrent.futures import Future as ThreadFuture +@dataclasses.dataclass +class tagIP: + family: socket.AddressFamily + value: str + sign: str + + +def _enum_local_ips() -> typing.List[tagIP]: + res = subprocess.check_output(["ip", "-4", "addr", "show", "dev", "lo"]).decode() + ips = [ + tagIP( + family=socket.AddressFamily.AF_INET, + value=v, + sign="ip4={}".format(v), + ) + for v in re.findall(r"inet\s+([0-9.]+)", res)] + + ips.append( + tagIP( + family=socket.AddressFamily.AF_INET6, + value="::1", + sign="ip6=standard_localhost", + )) + + return ips + + class TestOsOpsCommon: sm_os_ops_descrs: typing.List[OsOpsDescr] = [ OsOpsDescrs.sm_local_os_ops_descr, @@ -898,6 +926,134 @@ def LOCAL_server(s: socket.socket): if ok_count == 0: raise RuntimeError("No one free port was found.") + # -------------------------------------------------------------------- + @pytest.fixture( + params=[ + pytest.param( + ip, + id=ip.sign, + ) for ip in _enum_local_ips() + ] + ) + def local_ip(self, request: pytest.FixtureRequest) -> tagIP: + assert isinstance(request, pytest.FixtureRequest) + assert type(request.param) is tagIP + return request.param + + # -------------------------------------------------------------------- + def test_is_port_available__true( + self, + os_ops: OsOperations, + local_ip: tagIP, + ): + assert isinstance(os_ops, OsOperations) + assert type(local_ip) is tagIP + + C_LIMIT = 128 + + ports = set(range(1024, 65535)) + assert type(ports) is set + + ok_count = 0 + no_count = 0 + + for port in ports: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.bind(("", port)) + except OSError: + continue + + r = os_ops.is_port_available(local_ip.value, port) + + if r: + ok_count += 1 + logging.info("OK. Port {} is available.".format(port)) + else: + no_count += 1 + logging.warning("NO. Port {} is not available.".format(port)) + + if ok_count == C_LIMIT: + return + + if no_count == C_LIMIT: + raise RuntimeError("To many false positive test attempts.") + continue + + if ok_count == 0: + raise RuntimeError("No one available port was found.") + return + + # -------------------------------------------------------------------- + def test_is_port_available__false( + self, + os_ops: OsOperations, + local_ip: tagIP, + ): + assert isinstance(os_ops, OsOperations) + assert type(local_ip) is tagIP + + C_LIMIT = 10 + + ports = set(range(1024, 65535)) + assert type(ports) is set + + def LOCAL_server(s: socket.socket): + assert s is not None + assert type(s) is socket.socket + + try: + while True: + r = s.accept() + + if r is None: + break + except Exception as e: + assert e is not None + pass + + ok_count = 0 + no_count = 0 + + for port in ports: + with socket.socket(local_ip.family, socket.SOCK_STREAM) as s: + try: + s.bind((local_ip.value, port)) + except OSError: + continue + + th = threading.Thread(target=LOCAL_server, args=[s]) + + s.listen(10) + + assert type(th) is threading.Thread + th.start() + + try: + r = os_ops.is_port_available(local_ip.value, port) + finally: + s.shutdown(2) + th.join() + + if not r: + ok_count += 1 + logging.info("OK. Port {} is not available.".format(port)) + else: + no_count += 1 + logging.warning("NO. Port {} does not accept connection.".format(port)) + + if ok_count == C_LIMIT: + return + + if no_count == C_LIMIT: + raise RuntimeError("To many false positive test attempts.") + continue + + if ok_count == 0: + raise RuntimeError("No one available port was found.") + return + + # -------------------------------------------------------------------- def test_get_tmpdir(self, os_ops: OsOperations): assert isinstance(os_ops, OsOperations)