Skip to content

Docker

erpc.docker

Docker-based eRPC process lifecycle manager.

Manages eRPC as a Docker container instead of a local binary, using the docker CLI via subprocess calls to keep dependencies light.

Examples:

Context manager (recommended)::

config = ERPCConfig(upstreams={1: ["https://eth.llamarpc.com"]})
with DockerERPCProcess(config=config) as erpc:
    url = erpc.endpoint_url(1)

Manual lifecycle::

proc = DockerERPCProcess(config=config, name="my-erpc")
proc.start()
proc.wait_for_health()
...
proc.stop()

DockerERPCProcess

Manages an eRPC instance as a Docker container.

Uses the docker CLI via :mod:subprocess — no Python Docker SDK required.

Parameters:

Name Type Description Default
config ERPCConfig

eRPC configuration to mount into the container.

required
image str

Docker image to use.

_DEFAULT_IMAGE
port int

Host port to map to the eRPC server port (container port 4000).

4000
metrics_port int

Host port to map to the metrics port (container port 4001).

4001
name str | None

Optional container name for easier identification.

None

Raises:

Type Description
ERPCError

If the docker binary is not found.

Examples:

>>> config = ERPCConfig(upstreams={1: ["https://eth.llamarpc.com"]})
>>> proc = DockerERPCProcess(config=config, name="my-erpc")
>>> proc.image
'ghcr.io/erpc/erpc:latest'
Source code in erpc/docker.py
class DockerERPCProcess:
    """Manages an eRPC instance as a Docker container.

    Uses the ``docker`` CLI via :mod:`subprocess` — no Python Docker SDK required.

    Args:
        config: eRPC configuration to mount into the container.
        image: Docker image to use.
        port: Host port to map to the eRPC server port (container port 4000).
        metrics_port: Host port to map to the metrics port (container port 4001).
        name: Optional container name for easier identification.

    Raises:
        ERPCError: If the ``docker`` binary is not found.

    Examples:
        >>> config = ERPCConfig(upstreams={1: ["https://eth.llamarpc.com"]})
        >>> proc = DockerERPCProcess(config=config, name="my-erpc")
        >>> proc.image
        'ghcr.io/erpc/erpc:latest'

    """

    _container_id: str | None = None
    _config_path: Path | None = None

    def __init__(
        self,
        config: ERPCConfig,
        image: str = _DEFAULT_IMAGE,
        port: int = 4000,
        metrics_port: int = 4001,
        name: str | None = None,
    ) -> None:
        """Initialize DockerERPCProcess.

        Args:
            config: eRPC configuration to mount into the container.
            image: Docker image to use.
            port: Host port to map to the eRPC server port.
            metrics_port: Host port to map to the metrics port.
            name: Optional container name.

        Raises:
            ERPCError: If the ``docker`` binary is not found.

        """
        self._docker = find_docker_binary()
        self.config = config
        self.image = image
        self.port = port
        self.metrics_port = metrics_port
        self.name = name

    @property
    def container_id(self) -> str | None:
        """ID of the running Docker container, or ``None`` if not started.

        Returns:
            Container ID string or ``None``.

        """
        return self._container_id

    @property
    def is_running(self) -> bool:
        """Whether the Docker container is currently running.

        Checks container status via ``docker inspect``.

        Returns:
            ``True`` if the container is running, ``False`` otherwise.

        """
        if not self._container_id:
            return False
        try:
            result = subprocess.run(
                [
                    self._docker,
                    "inspect",
                    "--format",
                    "{{.State.Running}}",
                    self._container_id,
                ],
                capture_output=True,
                text=True,
                check=True,
            )
            return result.stdout.strip().lower() == "true"
        except (subprocess.CalledProcessError, OSError):
            return False

    @property
    def is_healthy(self) -> bool:
        """Whether the eRPC health endpoint responds successfully.

        Returns:
            ``True`` if the health endpoint responds with HTTP 2xx.

        """
        try:
            urlopen(self.config.health_url, timeout=2)
        except (URLError, OSError):
            return False
        else:
            return True

    @property
    def endpoint(self) -> str:
        """Base eRPC endpoint URL.

        Returns:
            HTTP URL for the eRPC server.

        """
        return f"http://{self.config.server_host}:{self.port}"

    def endpoint_url(self, chain_id: int) -> str:
        """Get the proxied endpoint URL for a specific chain.

        Args:
            chain_id: EVM chain identifier.

        Returns:
            Full URL for the proxied RPC endpoint.

        """
        return (
            f"http://{self.config.server_host}:{self.port}/{self.config.project_id}/evm/{chain_id}"
        )

    def start(self) -> None:
        """Start the eRPC Docker container.

        Pulls the image if needed, writes the config to a temp file, and
        runs the container with the config mounted as a volume.

        Raises:
            ERPCError: If the container is already running or fails to start.

        """
        if self.is_running:
            raise ERPCError("eRPC container is already running")

        # Write config to temp file for volume mount
        self._config_path = self.config.write()

        cmd = [
            self._docker,
            "run",
            "-d",
            "-p",
            f"{self.port}:4000",
            "-p",
            f"{self.metrics_port}:4001",
            "-v",
            f"{self._config_path}:{_CONTAINER_CONFIG_PATH}:ro",
        ]

        if self.name:
            cmd.extend(["--name", self.name])

        cmd.append(self.image)

        logger.info("Starting eRPC container: %s", " ".join(cmd))
        try:
            result = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                check=True,
            )
            self._container_id = result.stdout.strip()
        except subprocess.CalledProcessError as e:
            raise ERPCError(f"Failed to start eRPC container: {e.stderr}") from e

        logger.info("eRPC container started (ID: %s)", self._container_id)

    def stop(self, timeout: int = 10) -> None:
        """Stop and remove the eRPC Docker container.

        Args:
            timeout: Seconds to wait for graceful shutdown before force-killing.

        Raises:
            ERPCNotRunning: If no container is running.

        """
        if not self._container_id:
            raise ERPCNotRunning("eRPC container is not running")

        logger.info("Stopping eRPC container %s...", self._container_id)
        try:
            subprocess.run(
                [self._docker, "stop", "--time", str(timeout), self._container_id],
                capture_output=True,
                text=True,
                check=True,
            )
        except subprocess.CalledProcessError:
            logger.warning("docker stop failed for %s", self._container_id)

        try:
            subprocess.run(
                [self._docker, "rm", "-f", self._container_id],
                capture_output=True,
                text=True,
                check=True,
            )
        except subprocess.CalledProcessError:
            logger.warning("docker rm failed for %s", self._container_id)

        logger.info("eRPC container stopped and removed")
        self._cleanup()

    def restart(self, timeout: int = 10) -> None:
        """Restart the eRPC Docker container.

        Args:
            timeout: Seconds to wait for graceful shutdown before restart.

        """
        if self.is_running:
            self.stop(timeout=timeout)
        self.start()

    def wait_for_health(self, timeout: int = 30) -> None:
        """Wait for eRPC to become healthy.

        Args:
            timeout: Maximum seconds to wait.

        Raises:
            ERPCHealthCheckError: If the health check times out.
            ERPCError: If the container stops during the health check.

        """
        logger.info("Waiting for eRPC health (timeout: %ds)...", timeout)
        deadline = time.monotonic() + timeout

        while time.monotonic() < deadline:
            if self._container_id and not self.is_running:
                raise ERPCError("eRPC container stopped during health check")

            if self.is_healthy:
                logger.info("eRPC is healthy")
                return

            time.sleep(0.5)

        raise ERPCHealthCheckError(f"eRPC did not become healthy within {timeout}s")

    def logs(self, tail: int = 100) -> str:
        """Fetch recent container logs.

        Args:
            tail: Number of recent log lines to retrieve.

        Returns:
            Container log output as a string.

        Raises:
            ERPCNotRunning: If no container exists to fetch logs from.

        """
        if not self._container_id:
            raise ERPCNotRunning("eRPC container is not running")

        result = subprocess.run(
            [self._docker, "logs", "--tail", str(tail), self._container_id],
            capture_output=True,
            text=True,
            check=True,
        )
        return result.stdout

    def _cleanup(self) -> None:
        """Clean up temporary config files and reset state."""
        if self._config_path and self._config_path.exists():
            import contextlib

            with contextlib.suppress(OSError):
                self._config_path.unlink()
        self._config_path = None
        self._container_id = None

    def __enter__(self) -> DockerERPCProcess:
        """Start eRPC container and wait for health on context entry."""
        self.start()
        self.wait_for_health()
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Stop eRPC container on context exit."""
        if self.is_running:
            self.stop()

container_id property

container_id: str | None

ID of the running Docker container, or None if not started.

Returns:

Type Description
str | None

Container ID string or None.

is_running property

is_running: bool

Whether the Docker container is currently running.

Checks container status via docker inspect.

Returns:

Type Description
bool

True if the container is running, False otherwise.

is_healthy property

is_healthy: bool

Whether the eRPC health endpoint responds successfully.

Returns:

Type Description
bool

True if the health endpoint responds with HTTP 2xx.

endpoint property

endpoint: str

Base eRPC endpoint URL.

Returns:

Type Description
str

HTTP URL for the eRPC server.

__init__

__init__(config: ERPCConfig, image: str = _DEFAULT_IMAGE, port: int = 4000, metrics_port: int = 4001, name: str | None = None) -> None

Initialize DockerERPCProcess.

Parameters:

Name Type Description Default
config ERPCConfig

eRPC configuration to mount into the container.

required
image str

Docker image to use.

_DEFAULT_IMAGE
port int

Host port to map to the eRPC server port.

4000
metrics_port int

Host port to map to the metrics port.

4001
name str | None

Optional container name.

None

Raises:

Type Description
ERPCError

If the docker binary is not found.

Source code in erpc/docker.py
def __init__(
    self,
    config: ERPCConfig,
    image: str = _DEFAULT_IMAGE,
    port: int = 4000,
    metrics_port: int = 4001,
    name: str | None = None,
) -> None:
    """Initialize DockerERPCProcess.

    Args:
        config: eRPC configuration to mount into the container.
        image: Docker image to use.
        port: Host port to map to the eRPC server port.
        metrics_port: Host port to map to the metrics port.
        name: Optional container name.

    Raises:
        ERPCError: If the ``docker`` binary is not found.

    """
    self._docker = find_docker_binary()
    self.config = config
    self.image = image
    self.port = port
    self.metrics_port = metrics_port
    self.name = name

endpoint_url

endpoint_url(chain_id: int) -> str

Get the proxied endpoint URL for a specific chain.

Parameters:

Name Type Description Default
chain_id int

EVM chain identifier.

required

Returns:

Type Description
str

Full URL for the proxied RPC endpoint.

Source code in erpc/docker.py
def endpoint_url(self, chain_id: int) -> str:
    """Get the proxied endpoint URL for a specific chain.

    Args:
        chain_id: EVM chain identifier.

    Returns:
        Full URL for the proxied RPC endpoint.

    """
    return (
        f"http://{self.config.server_host}:{self.port}/{self.config.project_id}/evm/{chain_id}"
    )

start

start() -> None

Start the eRPC Docker container.

Pulls the image if needed, writes the config to a temp file, and runs the container with the config mounted as a volume.

Raises:

Type Description
ERPCError

If the container is already running or fails to start.

Source code in erpc/docker.py
def start(self) -> None:
    """Start the eRPC Docker container.

    Pulls the image if needed, writes the config to a temp file, and
    runs the container with the config mounted as a volume.

    Raises:
        ERPCError: If the container is already running or fails to start.

    """
    if self.is_running:
        raise ERPCError("eRPC container is already running")

    # Write config to temp file for volume mount
    self._config_path = self.config.write()

    cmd = [
        self._docker,
        "run",
        "-d",
        "-p",
        f"{self.port}:4000",
        "-p",
        f"{self.metrics_port}:4001",
        "-v",
        f"{self._config_path}:{_CONTAINER_CONFIG_PATH}:ro",
    ]

    if self.name:
        cmd.extend(["--name", self.name])

    cmd.append(self.image)

    logger.info("Starting eRPC container: %s", " ".join(cmd))
    try:
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            check=True,
        )
        self._container_id = result.stdout.strip()
    except subprocess.CalledProcessError as e:
        raise ERPCError(f"Failed to start eRPC container: {e.stderr}") from e

    logger.info("eRPC container started (ID: %s)", self._container_id)

stop

stop(timeout: int = 10) -> None

Stop and remove the eRPC Docker container.

Parameters:

Name Type Description Default
timeout int

Seconds to wait for graceful shutdown before force-killing.

10

Raises:

Type Description
ERPCNotRunning

If no container is running.

Source code in erpc/docker.py
def stop(self, timeout: int = 10) -> None:
    """Stop and remove the eRPC Docker container.

    Args:
        timeout: Seconds to wait for graceful shutdown before force-killing.

    Raises:
        ERPCNotRunning: If no container is running.

    """
    if not self._container_id:
        raise ERPCNotRunning("eRPC container is not running")

    logger.info("Stopping eRPC container %s...", self._container_id)
    try:
        subprocess.run(
            [self._docker, "stop", "--time", str(timeout), self._container_id],
            capture_output=True,
            text=True,
            check=True,
        )
    except subprocess.CalledProcessError:
        logger.warning("docker stop failed for %s", self._container_id)

    try:
        subprocess.run(
            [self._docker, "rm", "-f", self._container_id],
            capture_output=True,
            text=True,
            check=True,
        )
    except subprocess.CalledProcessError:
        logger.warning("docker rm failed for %s", self._container_id)

    logger.info("eRPC container stopped and removed")
    self._cleanup()

restart

restart(timeout: int = 10) -> None

Restart the eRPC Docker container.

Parameters:

Name Type Description Default
timeout int

Seconds to wait for graceful shutdown before restart.

10
Source code in erpc/docker.py
def restart(self, timeout: int = 10) -> None:
    """Restart the eRPC Docker container.

    Args:
        timeout: Seconds to wait for graceful shutdown before restart.

    """
    if self.is_running:
        self.stop(timeout=timeout)
    self.start()

wait_for_health

wait_for_health(timeout: int = 30) -> None

Wait for eRPC to become healthy.

Parameters:

Name Type Description Default
timeout int

Maximum seconds to wait.

30

Raises:

Type Description
ERPCHealthCheckError

If the health check times out.

ERPCError

If the container stops during the health check.

Source code in erpc/docker.py
def wait_for_health(self, timeout: int = 30) -> None:
    """Wait for eRPC to become healthy.

    Args:
        timeout: Maximum seconds to wait.

    Raises:
        ERPCHealthCheckError: If the health check times out.
        ERPCError: If the container stops during the health check.

    """
    logger.info("Waiting for eRPC health (timeout: %ds)...", timeout)
    deadline = time.monotonic() + timeout

    while time.monotonic() < deadline:
        if self._container_id and not self.is_running:
            raise ERPCError("eRPC container stopped during health check")

        if self.is_healthy:
            logger.info("eRPC is healthy")
            return

        time.sleep(0.5)

    raise ERPCHealthCheckError(f"eRPC did not become healthy within {timeout}s")

logs

logs(tail: int = 100) -> str

Fetch recent container logs.

Parameters:

Name Type Description Default
tail int

Number of recent log lines to retrieve.

100

Returns:

Type Description
str

Container log output as a string.

Raises:

Type Description
ERPCNotRunning

If no container exists to fetch logs from.

Source code in erpc/docker.py
def logs(self, tail: int = 100) -> str:
    """Fetch recent container logs.

    Args:
        tail: Number of recent log lines to retrieve.

    Returns:
        Container log output as a string.

    Raises:
        ERPCNotRunning: If no container exists to fetch logs from.

    """
    if not self._container_id:
        raise ERPCNotRunning("eRPC container is not running")

    result = subprocess.run(
        [self._docker, "logs", "--tail", str(tail), self._container_id],
        capture_output=True,
        text=True,
        check=True,
    )
    return result.stdout

__enter__

__enter__() -> DockerERPCProcess

Start eRPC container and wait for health on context entry.

Source code in erpc/docker.py
def __enter__(self) -> DockerERPCProcess:
    """Start eRPC container and wait for health on context entry."""
    self.start()
    self.wait_for_health()
    return self

__exit__

__exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None

Stop eRPC container on context exit.

Source code in erpc/docker.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Stop eRPC container on context exit."""
    if self.is_running:
        self.stop()

find_docker_binary

find_docker_binary() -> str

Locate the docker CLI binary on PATH.

Returns:

Type Description
str

Absolute path to the docker binary.

Raises:

Type Description
ERPCError

If docker is not installed or not on PATH.

Source code in erpc/docker.py
def find_docker_binary() -> str:
    """Locate the ``docker`` CLI binary on PATH.

    Returns:
        Absolute path to the docker binary.

    Raises:
        ERPCError: If docker is not installed or not on PATH.

    """
    found = shutil.which("docker")
    if found:
        return found
    raise ERPCError(
        "docker CLI not found on PATH. Install Docker to use DockerERPCProcess. "
        "See: https://docs.docker.com/get-docker/"
    )