Skip to content

Dynamic

erpc.dynamic

Dynamic configuration updates for eRPC processes.

Supports runtime config changes via stop -> rewrite -> start cycle, since eRPC does not support SIGHUP-based config reload natively.

Examples:

Update the full config::

from erpc.dynamic import update_config

diff = update_config(process, new_config)
print(diff)

Hot-add an upstream::

from erpc.dynamic import add_upstream

add_upstream(process, chain_id=137, endpoint="https://polygon.llamarpc.com")

ConfigDiff dataclass

Tracks what changed between two eRPC configurations.

Attributes:

Name Type Description
added_upstreams dict[int, list[str]]

Chain IDs and their endpoints that were added entirely.

removed_upstreams dict[int, list[str]]

Chain IDs and their endpoints that were removed entirely.

added_endpoints dict[int, list[str]]

New endpoints added to existing chains.

removed_endpoints dict[int, list[str]]

Endpoints removed from existing chains.

changed_fields list[str]

List of scalar field names that changed.

Examples:

>>> diff = ConfigDiff(added_upstreams={137: ["https://polygon.example.com"]})
>>> diff.has_changes
True
Source code in erpc/dynamic.py
@dataclass
class ConfigDiff:
    """Tracks what changed between two eRPC configurations.

    Attributes:
        added_upstreams: Chain IDs and their endpoints that were added entirely.
        removed_upstreams: Chain IDs and their endpoints that were removed entirely.
        added_endpoints: New endpoints added to existing chains.
        removed_endpoints: Endpoints removed from existing chains.
        changed_fields: List of scalar field names that changed.

    Examples:
        >>> diff = ConfigDiff(added_upstreams={137: ["https://polygon.example.com"]})
        >>> diff.has_changes
        True

    """

    added_upstreams: dict[int, list[str]] = field(default_factory=dict)
    removed_upstreams: dict[int, list[str]] = field(default_factory=dict)
    added_endpoints: dict[int, list[str]] = field(default_factory=dict)
    removed_endpoints: dict[int, list[str]] = field(default_factory=dict)
    changed_fields: list[str] = field(default_factory=list)

    @property
    def has_changes(self) -> bool:
        """Whether any differences were detected.

        Returns:
            ``True`` if at least one field differs between configs.

        """
        return bool(
            self.added_upstreams
            or self.removed_upstreams
            or self.added_endpoints
            or self.removed_endpoints
            or self.changed_fields
        )

    def __str__(self) -> str:
        """Human-readable summary of config changes.

        Returns:
            Multi-line string describing all detected changes.

        """
        parts: list[str] = []
        if self.added_upstreams:
            chains = ", ".join(str(c) for c in self.added_upstreams)
            parts.append(f"Added chains: {chains}")
        if self.removed_upstreams:
            chains = ", ".join(str(c) for c in self.removed_upstreams)
            parts.append(f"Removed chains: {chains}")
        if self.added_endpoints:
            for chain_id, eps in self.added_endpoints.items():
                parts.append(f"Added endpoints on chain {chain_id}: {eps}")
        if self.removed_endpoints:
            for chain_id, eps in self.removed_endpoints.items():
                parts.append(f"Removed endpoints on chain {chain_id}: {eps}")
        if self.changed_fields:
            joined = ", ".join(self.changed_fields)
            parts.append(f"Changed fields: {joined}")
        return "\n".join(parts) if parts else "No changes"

has_changes property

has_changes: bool

Whether any differences were detected.

Returns:

Type Description
bool

True if at least one field differs between configs.

__str__

__str__() -> str

Human-readable summary of config changes.

Returns:

Type Description
str

Multi-line string describing all detected changes.

Source code in erpc/dynamic.py
def __str__(self) -> str:
    """Human-readable summary of config changes.

    Returns:
        Multi-line string describing all detected changes.

    """
    parts: list[str] = []
    if self.added_upstreams:
        chains = ", ".join(str(c) for c in self.added_upstreams)
        parts.append(f"Added chains: {chains}")
    if self.removed_upstreams:
        chains = ", ".join(str(c) for c in self.removed_upstreams)
        parts.append(f"Removed chains: {chains}")
    if self.added_endpoints:
        for chain_id, eps in self.added_endpoints.items():
            parts.append(f"Added endpoints on chain {chain_id}: {eps}")
    if self.removed_endpoints:
        for chain_id, eps in self.removed_endpoints.items():
            parts.append(f"Removed endpoints on chain {chain_id}: {eps}")
    if self.changed_fields:
        joined = ", ".join(self.changed_fields)
        parts.append(f"Changed fields: {joined}")
    return "\n".join(parts) if parts else "No changes"

atomic_write_config

atomic_write_config(config: ERPCConfig, path: Path) -> Path

Write config to a file atomically (write to temp, then rename).

Parameters:

Name Type Description Default
config ERPCConfig

The eRPC configuration to write.

required
path Path

Target file path.

required

Returns:

Type Description
Path

The path the config was written to.

Source code in erpc/dynamic.py
def atomic_write_config(config: ERPCConfig, path: Path) -> Path:
    """Write config to a file atomically (write to temp, then rename).

    Args:
        config: The eRPC configuration to write.
        path: Target file path.

    Returns:
        The path the config was written to.

    """
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)

    fd, tmp_path = tempfile.mkstemp(
        suffix=".yaml",
        prefix=".erpc-tmp-",
        dir=str(path.parent),
    )
    try:
        with os.fdopen(fd, "w") as f:
            f.write(config.to_yaml())
        os.replace(tmp_path, str(path))
    except BaseException:
        with contextlib.suppress(OSError):
            os.unlink(tmp_path)
        raise

    return path

update_config

update_config(process: ERPCProcess, new_config: ERPCConfig) -> ConfigDiff

Update a running eRPC process with a new configuration.

Performs a stop -> rewrite config -> start cycle since eRPC does not support SIGHUP-based config reload natively.

Parameters:

Name Type Description Default
process ERPCProcess

The running ERPCProcess to update.

required
new_config ERPCConfig

The new configuration to apply.

required

Returns:

Type Description
ConfigDiff

A ConfigDiff describing what changed.

Raises:

Type Description
ERPCNotRunning

If the process is not currently running.

Source code in erpc/dynamic.py
def update_config(process: ERPCProcess, new_config: ERPCConfig) -> ConfigDiff:
    """Update a running eRPC process with a new configuration.

    Performs a stop -> rewrite config -> start cycle since eRPC does not
    support SIGHUP-based config reload natively.

    Args:
        process: The running ERPCProcess to update.
        new_config: The new configuration to apply.

    Returns:
        A ConfigDiff describing what changed.

    Raises:
        ERPCNotRunning: If the process is not currently running.

    """
    if not process.is_running:
        raise ERPCNotRunning("Cannot update config: eRPC is not running")

    diff = _diff_configs(process.config, new_config)
    process.config = new_config
    process.stop()
    process.start()
    return diff

add_upstream

add_upstream(process: ERPCProcess, chain_id: int, endpoint: str) -> ConfigDiff

Add an upstream endpoint to a running eRPC process.

If the chain ID does not exist yet, it will be created.

Parameters:

Name Type Description Default
process ERPCProcess

The running ERPCProcess to update.

required
chain_id int

EVM chain identifier.

required
endpoint str

RPC endpoint URL to add.

required

Returns:

Type Description
ConfigDiff

A ConfigDiff describing what changed.

Raises:

Type Description
ERPCNotRunning

If the process is not currently running.

Source code in erpc/dynamic.py
def add_upstream(process: ERPCProcess, chain_id: int, endpoint: str) -> ConfigDiff:
    """Add an upstream endpoint to a running eRPC process.

    If the chain ID does not exist yet, it will be created.

    Args:
        process: The running ERPCProcess to update.
        chain_id: EVM chain identifier.
        endpoint: RPC endpoint URL to add.

    Returns:
        A ConfigDiff describing what changed.

    Raises:
        ERPCNotRunning: If the process is not currently running.

    """
    new_upstreams = deepcopy(process.config.upstreams)
    if chain_id in new_upstreams:
        new_upstreams[chain_id].append(endpoint)
    else:
        new_upstreams[chain_id] = [endpoint]

    new_config = _clone_config_with_upstreams(process.config, new_upstreams)
    return update_config(process, new_config)

remove_upstream

remove_upstream(process: ERPCProcess, chain_id: int, endpoint: str) -> ConfigDiff

Remove an upstream endpoint from a running eRPC process.

If the endpoint is the last one for a chain, the chain is removed entirely.

Parameters:

Name Type Description Default
process ERPCProcess

The running ERPCProcess to update.

required
chain_id int

EVM chain identifier.

required
endpoint str

RPC endpoint URL to remove.

required

Returns:

Type Description
ConfigDiff

A ConfigDiff describing what changed.

Raises:

Type Description
ERPCNotRunning

If the process is not currently running.

ValueError

If the endpoint is not found for the given chain.

Source code in erpc/dynamic.py
def remove_upstream(process: ERPCProcess, chain_id: int, endpoint: str) -> ConfigDiff:
    """Remove an upstream endpoint from a running eRPC process.

    If the endpoint is the last one for a chain, the chain is removed entirely.

    Args:
        process: The running ERPCProcess to update.
        chain_id: EVM chain identifier.
        endpoint: RPC endpoint URL to remove.

    Returns:
        A ConfigDiff describing what changed.

    Raises:
        ERPCNotRunning: If the process is not currently running.
        ValueError: If the endpoint is not found for the given chain.

    """
    if chain_id not in process.config.upstreams:
        msg = f"Endpoint not found for chain {chain_id}: {endpoint}"
        raise ValueError(msg)

    if endpoint not in process.config.upstreams[chain_id]:
        msg = f"Endpoint not found for chain {chain_id}: {endpoint}"
        raise ValueError(msg)

    new_upstreams = deepcopy(process.config.upstreams)
    new_upstreams[chain_id].remove(endpoint)
    if not new_upstreams[chain_id]:
        del new_upstreams[chain_id]

    new_config = _clone_config_with_upstreams(process.config, new_upstreams)
    return update_config(process, new_config)