Skip to content

Process

erpc.process

eRPC process lifecycle manager.

Inspired by py-geth <https://github.com/ethereum/py-geth>_'s BaseGethProcess pattern.

ERPC_BINARY module-attribute

ERPC_BINARY = 'erpc'

Default binary name to search for on PATH.

ERPCProcess

Manages an eRPC instance as a subprocess.

Supports both context-manager and manual lifecycle patterns.

Examples:

Context manager (recommended)::

with ERPCProcess(upstreams={1: ["https://..."]}) as erpc:
    url = erpc.endpoint_url(1)

Manual lifecycle::

erpc = ERPCProcess(config=my_config)
erpc.start()
erpc.wait_for_health()
...
erpc.stop()
Source code in erpc/process.py
class ERPCProcess:
    """Manages an eRPC instance as a subprocess.

    Supports both context-manager and manual lifecycle patterns.

    Examples:
        Context manager (recommended)::

            with ERPCProcess(upstreams={1: ["https://..."]}) as erpc:
                url = erpc.endpoint_url(1)

        Manual lifecycle::

            erpc = ERPCProcess(config=my_config)
            erpc.start()
            erpc.wait_for_health()
            ...
            erpc.stop()

    """

    _proc: subprocess.Popen[bytes] | None = None
    _config_path: Path | None = None
    _client: ERPCClient | None = None

    def __init__(
        self,
        config: ERPCConfig | None = None,
        upstreams: dict[int, list[str]] | None = None,
        binary_path: str | None = None,
        stdin: int = subprocess.DEVNULL,
        stdout: int = subprocess.PIPE,
        stderr: int = subprocess.PIPE,
    ) -> None:
        """Initialize ERPCProcess.

        Provide either a full ``ERPCConfig`` or just ``upstreams`` for quick setup.

        Args:
            config: Full eRPC configuration. Takes precedence over ``upstreams``.
            upstreams: Quick setup — mapping of chain_id to endpoint URLs.
            binary_path: Path to eRPC binary (auto-detected if not set).
            stdin: Subprocess stdin file descriptor.
            stdout: Subprocess stdout file descriptor.
            stderr: Subprocess stderr file descriptor.

        Raises:
            ValueError: If neither ``config`` nor ``upstreams`` is provided.
            ERPCNotFound: If the eRPC binary cannot be located.

        """
        if config is not None:
            self.config = config
        elif upstreams is not None:
            self.config = ERPCConfig(upstreams=upstreams)
        else:
            raise ValueError("Provide either config or upstreams")

        self.binary = find_erpc_binary(binary_path)
        self.stdin = stdin
        self.stdout = stdout
        self.stderr = stderr

    @property
    def is_running(self) -> bool:
        """Whether the eRPC subprocess is currently running."""
        return self._proc is not None and self._proc.poll() is None

    @property
    def is_alive(self) -> bool:
        """Whether the process is running AND the health endpoint responds."""
        return self.is_running and self.is_healthy

    @property
    def is_healthy(self) -> bool:
        """Whether the eRPC health endpoint responds successfully."""
        try:
            urlopen(self.config.health_url, timeout=2)
        except (URLError, OSError):
            return False
        else:
            return True

    @property
    def pid(self) -> int | None:
        """PID of the running eRPC process, or ``None``."""
        return self._proc.pid if self._proc else None

    @property
    def endpoint(self) -> str:
        """Base eRPC endpoint URL."""
        return f"http://{self.config.server_host}:{self.config.server_port}"

    @property
    def client(self) -> ERPCClient:
        """Return an :class:`~erpc.client.ERPCClient` pointed at this instance.

        The client is lazily created and cached for the lifetime of the process.

        Returns:
            An HTTP client configured with this instance's server and metrics URLs.

        """
        if self._client is None:
            from erpc.client import ERPCClient

            self._client = ERPCClient(
                base_url=self.endpoint,
                metrics_port=self.config.metrics_port,
            )
        return self._client

    def endpoint_url(self, chain_id: int) -> str:
        """Get the proxied endpoint URL for a specific chain.

        Args:
            chain_id: EVM chain identifier.

        Returns:
            Full URL for the proxied RPC endpoint.

        """
        return self.config.endpoint_url(chain_id)

    def start(self) -> None:
        """Start the eRPC process.

        Raises:
            ERPCStartupError: If the process fails to start or is already running.

        """
        if self.is_running:
            raise ERPCStartupError("eRPC is already running")

        self._config_path = self.config.write()
        command = [self.binary, str(self._config_path)]

        logger.info("Starting eRPC: %s", " ".join(command))
        try:
            self._proc = subprocess.Popen(
                command,
                stdin=self.stdin,
                stdout=self.stdout,
                stderr=self.stderr,
            )
        except OSError as e:
            raise ERPCStartupError(f"Failed to start eRPC: {e}") from e

        # Quick check — did it crash immediately?
        time.sleep(0.1)
        if self._proc.poll() is not None:
            stderr_output = ""
            if self._proc.stderr:
                stderr_output = self._proc.stderr.read().decode(errors="replace")
            raise ERPCStartupError(
                f"eRPC exited immediately (code {self._proc.returncode}): {stderr_output}"
            )

        logger.info("eRPC started (PID %s)", self._proc.pid)

    def stop(self, timeout: int = 5) -> None:
        """Stop the eRPC process gracefully.

        Sends ``SIGTERM``, waits up to ``timeout`` seconds, then ``SIGKILL``.

        Args:
            timeout: Seconds to wait for graceful shutdown.

        Raises:
            ERPCNotRunning: If the process is not running.

        """
        if not self._proc:
            raise ERPCNotRunning("eRPC is not running")

        if self._proc.poll() is not None:
            logger.info("eRPC already stopped")
            self._cleanup()
            return

        logger.info("Stopping eRPC (PID %s)...", self._proc.pid)
        self._proc.send_signal(signal.SIGTERM)

        try:
            self._proc.wait(timeout=timeout)
        except subprocess.TimeoutExpired:
            logger.warning("eRPC did not stop gracefully, sending SIGKILL")
            self._proc.kill()
            self._proc.wait(timeout=5)

        logger.info("eRPC stopped")
        self._cleanup()

    def restart(self, timeout: int = 5) -> None:
        """Restart the eRPC process.

        Args:
            timeout: Seconds to wait for graceful shutdown before restart.

        """
        if self.is_running:
            self.stop(timeout=timeout)
        self.start()

    def wait_for_health(self, timeout: int = 30) -> None:
        """Wait for eRPC to become healthy.

        Args:
            timeout: Maximum seconds to wait.

        Raises:
            ERPCHealthCheckError: If health check times out.
            ERPCStartupError: If the process dies during startup.

        """
        logger.info("Waiting for eRPC health (timeout: %ds)...", timeout)
        deadline = time.monotonic() + timeout

        while time.monotonic() < deadline:
            if not self.is_running:
                raise ERPCStartupError("eRPC process died during health check")

            if self.is_healthy:
                logger.info("eRPC is healthy")
                return

            time.sleep(0.5)

        raise ERPCHealthCheckError(f"eRPC did not become healthy within {timeout}s")

    def _cleanup(self) -> None:
        """Clean up temporary config files."""
        if self._config_path and self._config_path.exists():
            with contextlib.suppress(OSError):
                self._config_path.unlink()
        self._config_path = None
        self._proc = None

    def __enter__(self) -> ERPCProcess:
        """Start eRPC and wait for health on context entry."""
        self.start()
        self.wait_for_health()
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Stop eRPC on context exit."""
        if self.is_running:
            self.stop()

is_running property

is_running: bool

Whether the eRPC subprocess is currently running.

is_alive property

is_alive: bool

Whether the process is running AND the health endpoint responds.

is_healthy property

is_healthy: bool

Whether the eRPC health endpoint responds successfully.

pid property

pid: int | None

PID of the running eRPC process, or None.

endpoint property

endpoint: str

Base eRPC endpoint URL.

client property

client: ERPCClient

Return an :class:~erpc.client.ERPCClient pointed at this instance.

The client is lazily created and cached for the lifetime of the process.

Returns:

Type Description
ERPCClient

An HTTP client configured with this instance's server and metrics URLs.

__init__

__init__(config: ERPCConfig | None = None, upstreams: dict[int, list[str]] | None = None, binary_path: str | None = None, stdin: int = subprocess.DEVNULL, stdout: int = subprocess.PIPE, stderr: int = subprocess.PIPE) -> None

Initialize ERPCProcess.

Provide either a full ERPCConfig or just upstreams for quick setup.

Parameters:

Name Type Description Default
config ERPCConfig | None

Full eRPC configuration. Takes precedence over upstreams.

None
upstreams dict[int, list[str]] | None

Quick setup — mapping of chain_id to endpoint URLs.

None
binary_path str | None

Path to eRPC binary (auto-detected if not set).

None
stdin int

Subprocess stdin file descriptor.

DEVNULL
stdout int

Subprocess stdout file descriptor.

PIPE
stderr int

Subprocess stderr file descriptor.

PIPE

Raises:

Type Description
ValueError

If neither config nor upstreams is provided.

ERPCNotFound

If the eRPC binary cannot be located.

Source code in erpc/process.py
def __init__(
    self,
    config: ERPCConfig | None = None,
    upstreams: dict[int, list[str]] | None = None,
    binary_path: str | None = None,
    stdin: int = subprocess.DEVNULL,
    stdout: int = subprocess.PIPE,
    stderr: int = subprocess.PIPE,
) -> None:
    """Initialize ERPCProcess.

    Provide either a full ``ERPCConfig`` or just ``upstreams`` for quick setup.

    Args:
        config: Full eRPC configuration. Takes precedence over ``upstreams``.
        upstreams: Quick setup — mapping of chain_id to endpoint URLs.
        binary_path: Path to eRPC binary (auto-detected if not set).
        stdin: Subprocess stdin file descriptor.
        stdout: Subprocess stdout file descriptor.
        stderr: Subprocess stderr file descriptor.

    Raises:
        ValueError: If neither ``config`` nor ``upstreams`` is provided.
        ERPCNotFound: If the eRPC binary cannot be located.

    """
    if config is not None:
        self.config = config
    elif upstreams is not None:
        self.config = ERPCConfig(upstreams=upstreams)
    else:
        raise ValueError("Provide either config or upstreams")

    self.binary = find_erpc_binary(binary_path)
    self.stdin = stdin
    self.stdout = stdout
    self.stderr = stderr

endpoint_url

endpoint_url(chain_id: int) -> str

Get the proxied endpoint URL for a specific chain.

Parameters:

Name Type Description Default
chain_id int

EVM chain identifier.

required

Returns:

Type Description
str

Full URL for the proxied RPC endpoint.

Source code in erpc/process.py
def endpoint_url(self, chain_id: int) -> str:
    """Get the proxied endpoint URL for a specific chain.

    Args:
        chain_id: EVM chain identifier.

    Returns:
        Full URL for the proxied RPC endpoint.

    """
    return self.config.endpoint_url(chain_id)

start

start() -> None

Start the eRPC process.

Raises:

Type Description
ERPCStartupError

If the process fails to start or is already running.

Source code in erpc/process.py
def start(self) -> None:
    """Start the eRPC process.

    Raises:
        ERPCStartupError: If the process fails to start or is already running.

    """
    if self.is_running:
        raise ERPCStartupError("eRPC is already running")

    self._config_path = self.config.write()
    command = [self.binary, str(self._config_path)]

    logger.info("Starting eRPC: %s", " ".join(command))
    try:
        self._proc = subprocess.Popen(
            command,
            stdin=self.stdin,
            stdout=self.stdout,
            stderr=self.stderr,
        )
    except OSError as e:
        raise ERPCStartupError(f"Failed to start eRPC: {e}") from e

    # Quick check — did it crash immediately?
    time.sleep(0.1)
    if self._proc.poll() is not None:
        stderr_output = ""
        if self._proc.stderr:
            stderr_output = self._proc.stderr.read().decode(errors="replace")
        raise ERPCStartupError(
            f"eRPC exited immediately (code {self._proc.returncode}): {stderr_output}"
        )

    logger.info("eRPC started (PID %s)", self._proc.pid)

stop

stop(timeout: int = 5) -> None

Stop the eRPC process gracefully.

Sends SIGTERM, waits up to timeout seconds, then SIGKILL.

Parameters:

Name Type Description Default
timeout int

Seconds to wait for graceful shutdown.

5

Raises:

Type Description
ERPCNotRunning

If the process is not running.

Source code in erpc/process.py
def stop(self, timeout: int = 5) -> None:
    """Stop the eRPC process gracefully.

    Sends ``SIGTERM``, waits up to ``timeout`` seconds, then ``SIGKILL``.

    Args:
        timeout: Seconds to wait for graceful shutdown.

    Raises:
        ERPCNotRunning: If the process is not running.

    """
    if not self._proc:
        raise ERPCNotRunning("eRPC is not running")

    if self._proc.poll() is not None:
        logger.info("eRPC already stopped")
        self._cleanup()
        return

    logger.info("Stopping eRPC (PID %s)...", self._proc.pid)
    self._proc.send_signal(signal.SIGTERM)

    try:
        self._proc.wait(timeout=timeout)
    except subprocess.TimeoutExpired:
        logger.warning("eRPC did not stop gracefully, sending SIGKILL")
        self._proc.kill()
        self._proc.wait(timeout=5)

    logger.info("eRPC stopped")
    self._cleanup()

restart

restart(timeout: int = 5) -> None

Restart the eRPC process.

Parameters:

Name Type Description Default
timeout int

Seconds to wait for graceful shutdown before restart.

5
Source code in erpc/process.py
def restart(self, timeout: int = 5) -> None:
    """Restart the eRPC process.

    Args:
        timeout: Seconds to wait for graceful shutdown before restart.

    """
    if self.is_running:
        self.stop(timeout=timeout)
    self.start()

wait_for_health

wait_for_health(timeout: int = 30) -> None

Wait for eRPC to become healthy.

Parameters:

Name Type Description Default
timeout int

Maximum seconds to wait.

30

Raises:

Type Description
ERPCHealthCheckError

If health check times out.

ERPCStartupError

If the process dies during startup.

Source code in erpc/process.py
def wait_for_health(self, timeout: int = 30) -> None:
    """Wait for eRPC to become healthy.

    Args:
        timeout: Maximum seconds to wait.

    Raises:
        ERPCHealthCheckError: If health check times out.
        ERPCStartupError: If the process dies during startup.

    """
    logger.info("Waiting for eRPC health (timeout: %ds)...", timeout)
    deadline = time.monotonic() + timeout

    while time.monotonic() < deadline:
        if not self.is_running:
            raise ERPCStartupError("eRPC process died during health check")

        if self.is_healthy:
            logger.info("eRPC is healthy")
            return

        time.sleep(0.5)

    raise ERPCHealthCheckError(f"eRPC did not become healthy within {timeout}s")

__enter__

__enter__() -> ERPCProcess

Start eRPC and wait for health on context entry.

Source code in erpc/process.py
def __enter__(self) -> ERPCProcess:
    """Start eRPC and wait for health on context entry."""
    self.start()
    self.wait_for_health()
    return self

__exit__

__exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None

Stop eRPC on context exit.

Source code in erpc/process.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Stop eRPC on context exit."""
    if self.is_running:
        self.stop()

find_erpc_binary

find_erpc_binary(binary_path: str | None = None) -> str

Locate the eRPC binary.

Checks (in order):

  1. Explicit binary_path argument.
  2. ERPC_BINARY environment variable.
  3. Common install locations (/usr/local/bin, /usr/bin).
  4. System PATH via :func:shutil.which.

Parameters:

Name Type Description Default
binary_path str | None

Explicit path to the eRPC binary.

None

Returns:

Type Description
str

Absolute path to the eRPC binary.

Raises:

Type Description
ERPCNotFound

If the binary cannot be located anywhere.

Source code in erpc/process.py
def find_erpc_binary(binary_path: str | None = None) -> str:
    """Locate the eRPC binary.

    Checks (in order):

    1. Explicit ``binary_path`` argument.
    2. ``ERPC_BINARY`` environment variable.
    3. Common install locations (``/usr/local/bin``, ``/usr/bin``).
    4. System ``PATH`` via :func:`shutil.which`.

    Args:
        binary_path: Explicit path to the eRPC binary.

    Returns:
        Absolute path to the eRPC binary.

    Raises:
        ERPCNotFound: If the binary cannot be located anywhere.

    """
    candidates: list[str] = []

    if binary_path:
        candidates.append(binary_path)

    env_binary = os.environ.get("ERPC_BINARY")
    if env_binary:
        candidates.append(env_binary)

    candidates.extend(
        [
            "/usr/local/bin/erpc",
            "/usr/local/bin/erpc-server",
            "/usr/bin/erpc",
        ]
    )

    for candidate in candidates:
        if os.path.isfile(candidate) and os.access(candidate, os.X_OK):
            return candidate

    found = shutil.which(ERPC_BINARY) or shutil.which("erpc-server")
    if found:
        return found

    raise ERPCNotFound(
        "eRPC binary not found. Install it or set ERPC_BINARY env var. "
        "See: https://github.com/erpc/erpc"
    )