Skip to content

Database

erpc.database

eRPC database and cache configuration.

Provides dataclasses for configuring eRPC's caching layer, including multiple storage backends (memory, Redis, PostgreSQL, DynamoDB), cache policies with finality awareness, and compression settings.

Examples:

>>> from erpc.database import DatabaseConfig, MemoryConnector, CachePolicy
>>> db = DatabaseConfig(
...     connectors=[MemoryConnector(id="mem", max_items=10_000)],
...     policies=[CachePolicy(connector="mem", ttl="30s")],
... )
>>> db.to_dict()["connectors"][0]["driver"]
'memory'

Connector

Bases: Protocol

Protocol for database connector types.

Source code in erpc/database.py
class Connector(Protocol):
    """Protocol for database connector types."""

    id: str

    def to_dict(self) -> dict[str, Any]:
        """Serialize connector to eRPC config dict."""
        ...  # pragma: no cover

to_dict

to_dict() -> dict[str, Any]

Serialize connector to eRPC config dict.

Source code in erpc/database.py
def to_dict(self) -> dict[str, Any]:
    """Serialize connector to eRPC config dict."""
    ...  # pragma: no cover

TLSConfig dataclass

TLS certificate configuration for secure connections.

Parameters:

Name Type Description Default
cert_file str

Path to the client certificate file.

required
key_file str

Path to the client key file.

required
ca_file str

Path to the CA certificate file.

required
Source code in erpc/database.py
@dataclass
class TLSConfig:
    """TLS certificate configuration for secure connections.

    Args:
        cert_file: Path to the client certificate file.
        key_file: Path to the client key file.
        ca_file: Path to the CA certificate file.

    """

    cert_file: str
    key_file: str
    ca_file: str

    def to_dict(self) -> dict[str, str]:
        """Serialize to eRPC config dict.

        Returns:
            Dict with camelCase keys matching eRPC YAML schema.

        """
        return {
            "certFile": self.cert_file,
            "keyFile": self.key_file,
            "caFile": self.ca_file,
        }

to_dict

to_dict() -> dict[str, str]

Serialize to eRPC config dict.

Returns:

Type Description
dict[str, str]

Dict with camelCase keys matching eRPC YAML schema.

Source code in erpc/database.py
def to_dict(self) -> dict[str, str]:
    """Serialize to eRPC config dict.

    Returns:
        Dict with camelCase keys matching eRPC YAML schema.

    """
    return {
        "certFile": self.cert_file,
        "keyFile": self.key_file,
        "caFile": self.ca_file,
    }

MemoryConnector dataclass

In-memory cache connector.

Parameters:

Name Type Description Default
id str

Unique connector identifier referenced by cache policies.

required
max_items int

Maximum number of items to store in memory.

required

Examples:

>>> conn = MemoryConnector(id="mem1", max_items=10_000)
>>> conn.to_dict()["driver"]
'memory'
Source code in erpc/database.py
@dataclass
class MemoryConnector:
    """In-memory cache connector.

    Args:
        id: Unique connector identifier referenced by cache policies.
        max_items: Maximum number of items to store in memory.

    Examples:
        >>> conn = MemoryConnector(id="mem1", max_items=10_000)
        >>> conn.to_dict()["driver"]
        'memory'

    """

    id: str
    max_items: int

    def to_dict(self) -> dict[str, Any]:
        """Serialize to eRPC config dict.

        Returns:
            Dict with driver and memory configuration.

        """
        return {
            "id": self.id,
            "driver": "memory",
            "memory": {"maxItems": self.max_items},
        }

to_dict

to_dict() -> dict[str, Any]

Serialize to eRPC config dict.

Returns:

Type Description
dict[str, Any]

Dict with driver and memory configuration.

Source code in erpc/database.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to eRPC config dict.

    Returns:
        Dict with driver and memory configuration.

    """
    return {
        "id": self.id,
        "driver": "memory",
        "memory": {"maxItems": self.max_items},
    }

RedisConnector dataclass

Redis cache connector.

Parameters:

Name Type Description Default
id str

Unique connector identifier referenced by cache policies.

required
uri str

Redis connection URI (e.g., redis://localhost:6379).

required
tls TLSConfig | None

Optional TLS configuration for secure connections.

None
pool_size int | None

Optional connection pool size.

None

Examples:

>>> conn = RedisConnector(id="redis1", uri="redis://localhost:6379")
>>> conn.to_dict()["driver"]
'redis'
Source code in erpc/database.py
@dataclass
class RedisConnector:
    """Redis cache connector.

    Args:
        id: Unique connector identifier referenced by cache policies.
        uri: Redis connection URI (e.g., ``redis://localhost:6379``).
        tls: Optional TLS configuration for secure connections.
        pool_size: Optional connection pool size.

    Examples:
        >>> conn = RedisConnector(id="redis1", uri="redis://localhost:6379")
        >>> conn.to_dict()["driver"]
        'redis'

    """

    id: str
    uri: str
    tls: TLSConfig | None = None
    pool_size: int | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to eRPC config dict.

        Returns:
            Dict with driver and redis configuration.

        """
        redis_conf: dict[str, Any] = {"uri": self.uri}
        if self.tls is not None:
            redis_conf["tls"] = self.tls.to_dict()
        if self.pool_size is not None:
            redis_conf["poolSize"] = self.pool_size
        return {
            "id": self.id,
            "driver": "redis",
            "redis": redis_conf,
        }

to_dict

to_dict() -> dict[str, Any]

Serialize to eRPC config dict.

Returns:

Type Description
dict[str, Any]

Dict with driver and redis configuration.

Source code in erpc/database.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to eRPC config dict.

    Returns:
        Dict with driver and redis configuration.

    """
    redis_conf: dict[str, Any] = {"uri": self.uri}
    if self.tls is not None:
        redis_conf["tls"] = self.tls.to_dict()
    if self.pool_size is not None:
        redis_conf["poolSize"] = self.pool_size
    return {
        "id": self.id,
        "driver": "redis",
        "redis": redis_conf,
    }

PostgresConnector dataclass

PostgreSQL cache connector.

Parameters:

Name Type Description Default
id str

Unique connector identifier referenced by cache policies.

required
uri str

PostgreSQL connection URI.

required
table str

Table name for cache storage.

required

Examples:

>>> conn = PostgresConnector(id="pg1", uri="postgres://host/db", table="cache")
>>> conn.to_dict()["driver"]
'postgresql'
Source code in erpc/database.py
@dataclass
class PostgresConnector:
    """PostgreSQL cache connector.

    Args:
        id: Unique connector identifier referenced by cache policies.
        uri: PostgreSQL connection URI.
        table: Table name for cache storage.

    Examples:
        >>> conn = PostgresConnector(id="pg1", uri="postgres://host/db", table="cache")
        >>> conn.to_dict()["driver"]
        'postgresql'

    """

    id: str
    uri: str
    table: str

    def to_dict(self) -> dict[str, Any]:
        """Serialize to eRPC config dict.

        Returns:
            Dict with driver and postgresql configuration.

        """
        return {
            "id": self.id,
            "driver": "postgresql",
            "postgresql": {"connectionUri": self.uri, "table": self.table},
        }

to_dict

to_dict() -> dict[str, Any]

Serialize to eRPC config dict.

Returns:

Type Description
dict[str, Any]

Dict with driver and postgresql configuration.

Source code in erpc/database.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to eRPC config dict.

    Returns:
        Dict with driver and postgresql configuration.

    """
    return {
        "id": self.id,
        "driver": "postgresql",
        "postgresql": {"connectionUri": self.uri, "table": self.table},
    }

DynamoDBConnector dataclass

AWS DynamoDB cache connector.

Parameters:

Name Type Description Default
id str

Unique connector identifier referenced by cache policies.

required
table str

DynamoDB table name.

required
region str

AWS region (e.g., us-east-1).

required
partition_key_name str

Name of the partition key attribute.

required
range_key_name str

Name of the range/sort key attribute.

required
ttl_attribute str

Name of the TTL attribute for automatic expiration.

required

Examples:

>>> conn = DynamoDBConnector(
...     id="ddb1",
...     table="cache",
...     region="us-east-1",
...     partition_key_name="pk",
...     range_key_name="sk",
...     ttl_attribute="ttl",
... )
>>> conn.to_dict()["driver"]
'dynamodb'
Source code in erpc/database.py
@dataclass
class DynamoDBConnector:
    """AWS DynamoDB cache connector.

    Args:
        id: Unique connector identifier referenced by cache policies.
        table: DynamoDB table name.
        region: AWS region (e.g., ``us-east-1``).
        partition_key_name: Name of the partition key attribute.
        range_key_name: Name of the range/sort key attribute.
        ttl_attribute: Name of the TTL attribute for automatic expiration.

    Examples:
        >>> conn = DynamoDBConnector(
        ...     id="ddb1",
        ...     table="cache",
        ...     region="us-east-1",
        ...     partition_key_name="pk",
        ...     range_key_name="sk",
        ...     ttl_attribute="ttl",
        ... )
        >>> conn.to_dict()["driver"]
        'dynamodb'

    """

    id: str
    table: str
    region: str
    partition_key_name: str
    range_key_name: str
    ttl_attribute: str

    def to_dict(self) -> dict[str, Any]:
        """Serialize to eRPC config dict.

        Returns:
            Dict with driver and dynamodb configuration.

        """
        return {
            "id": self.id,
            "driver": "dynamodb",
            "dynamodb": {
                "table": self.table,
                "region": self.region,
                "partitionKeyName": self.partition_key_name,
                "rangeKeyName": self.range_key_name,
                "ttlAttribute": self.ttl_attribute,
            },
        }

to_dict

to_dict() -> dict[str, Any]

Serialize to eRPC config dict.

Returns:

Type Description
dict[str, Any]

Dict with driver and dynamodb configuration.

Source code in erpc/database.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to eRPC config dict.

    Returns:
        Dict with driver and dynamodb configuration.

    """
    return {
        "id": self.id,
        "driver": "dynamodb",
        "dynamodb": {
            "table": self.table,
            "region": self.region,
            "partitionKeyName": self.partition_key_name,
            "rangeKeyName": self.range_key_name,
            "ttlAttribute": self.ttl_attribute,
        },
    }

CompressionConfig dataclass

Cache compression configuration.

Parameters:

Name Type Description Default
algorithm str

Compression algorithm (e.g., zstd, gzip).

'zstd'
level str

Compression level (e.g., default, fast, best).

'default'
threshold int

Minimum item size in bytes before compression is applied.

1024

Examples:

>>> comp = CompressionConfig()
>>> comp.algorithm
'zstd'
Source code in erpc/database.py
@dataclass
class CompressionConfig:
    """Cache compression configuration.

    Args:
        algorithm: Compression algorithm (e.g., ``zstd``, ``gzip``).
        level: Compression level (e.g., ``default``, ``fast``, ``best``).
        threshold: Minimum item size in bytes before compression is applied.

    Examples:
        >>> comp = CompressionConfig()
        >>> comp.algorithm
        'zstd'

    """

    algorithm: str = "zstd"
    level: str = "default"
    threshold: int = 1024

    def to_dict(self) -> dict[str, Any]:
        """Serialize to eRPC config dict.

        Returns:
            Dict with compression settings.

        """
        return {
            "algorithm": self.algorithm,
            "level": self.level,
            "threshold": self.threshold,
        }

to_dict

to_dict() -> dict[str, Any]

Serialize to eRPC config dict.

Returns:

Type Description
dict[str, Any]

Dict with compression settings.

Source code in erpc/database.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to eRPC config dict.

    Returns:
        Dict with compression settings.

    """
    return {
        "algorithm": self.algorithm,
        "level": self.level,
        "threshold": self.threshold,
    }

CachePolicy dataclass

Cache policy rule with finality awareness.

Defines caching behavior for specific methods, networks, and finality states.

Parameters:

Name Type Description Default
connector str

ID of the connector to use for this policy.

required
ttl str

Time-to-live duration string (e.g., 30s, 1h).

required
network str | None

Optional network filter (e.g., evm:1).

None
method str | None

Optional RPC method filter (e.g., eth_getBlockByNumber).

None
finality str | None

Optional finality state filter. One of finalized, unfinalized, realtime, or unknown.

None
empty str | None

Optional empty response handling. One of ignore, allow, or only.

None
min_item_size int | None

Optional minimum item size in bytes to cache.

None
max_item_size int | None

Optional maximum item size in bytes to cache.

None

Examples:

>>> policy = CachePolicy(connector="mem1", ttl="30s", finality="finalized")
>>> policy.to_dict()["finality"]
'finalized'
Source code in erpc/database.py
@dataclass
class CachePolicy:
    """Cache policy rule with finality awareness.

    Defines caching behavior for specific methods, networks, and
    finality states.

    Args:
        connector: ID of the connector to use for this policy.
        ttl: Time-to-live duration string (e.g., ``30s``, ``1h``).
        network: Optional network filter (e.g., ``evm:1``).
        method: Optional RPC method filter (e.g., ``eth_getBlockByNumber``).
        finality: Optional finality state filter. One of ``finalized``,
            ``unfinalized``, ``realtime``, or ``unknown``.
        empty: Optional empty response handling. One of ``ignore``,
            ``allow``, or ``only``.
        min_item_size: Optional minimum item size in bytes to cache.
        max_item_size: Optional maximum item size in bytes to cache.

    Examples:
        >>> policy = CachePolicy(connector="mem1", ttl="30s", finality="finalized")
        >>> policy.to_dict()["finality"]
        'finalized'

    """

    connector: str
    ttl: str
    network: str | None = None
    method: str | None = None
    finality: str | None = None
    empty: str | None = None
    min_item_size: int | None = None
    max_item_size: int | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to eRPC config dict.

        Returns:
            Dict with only non-None fields, using camelCase keys.

        """
        result: dict[str, Any] = {
            "connector": self.connector,
            "ttl": self.ttl,
        }
        if self.network is not None:
            result["network"] = self.network
        if self.method is not None:
            result["method"] = self.method
        if self.finality is not None:
            result["finality"] = self.finality
        if self.empty is not None:
            result["empty"] = self.empty
        if self.min_item_size is not None:
            result["minItemSize"] = self.min_item_size
        if self.max_item_size is not None:
            result["maxItemSize"] = self.max_item_size
        return result

to_dict

to_dict() -> dict[str, Any]

Serialize to eRPC config dict.

Returns:

Type Description
dict[str, Any]

Dict with only non-None fields, using camelCase keys.

Source code in erpc/database.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to eRPC config dict.

    Returns:
        Dict with only non-None fields, using camelCase keys.

    """
    result: dict[str, Any] = {
        "connector": self.connector,
        "ttl": self.ttl,
    }
    if self.network is not None:
        result["network"] = self.network
    if self.method is not None:
        result["method"] = self.method
    if self.finality is not None:
        result["finality"] = self.finality
    if self.empty is not None:
        result["empty"] = self.empty
    if self.min_item_size is not None:
        result["minItemSize"] = self.min_item_size
    if self.max_item_size is not None:
        result["maxItemSize"] = self.max_item_size
    return result

DatabaseConfig dataclass

Top-level database/cache configuration for eRPC.

Composes connectors, cache policies, and optional compression into the database section of an eRPC config.

Parameters:

Name Type Description Default
connectors list[ConnectorType]

List of storage backend connectors.

list()
policies list[CachePolicy]

List of cache policy rules.

list()
compression CompressionConfig | None

Optional compression configuration.

None

Examples:

>>> db = DatabaseConfig(
...     connectors=[MemoryConnector(id="mem", max_items=10_000)],
...     policies=[CachePolicy(connector="mem", ttl="30s")],
... )
>>> len(db.to_dict()["connectors"])
1
Source code in erpc/database.py
@dataclass
class DatabaseConfig:
    """Top-level database/cache configuration for eRPC.

    Composes connectors, cache policies, and optional compression into
    the ``database`` section of an eRPC config.

    Args:
        connectors: List of storage backend connectors.
        policies: List of cache policy rules.
        compression: Optional compression configuration.

    Examples:
        >>> db = DatabaseConfig(
        ...     connectors=[MemoryConnector(id="mem", max_items=10_000)],
        ...     policies=[CachePolicy(connector="mem", ttl="30s")],
        ... )
        >>> len(db.to_dict()["connectors"])
        1

    """

    connectors: list[ConnectorType] = field(default_factory=list)
    policies: list[CachePolicy] = field(default_factory=list)
    compression: CompressionConfig | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to eRPC config dict.

        Returns:
            Dict suitable for inclusion in eRPC YAML under
            the ``database`` key.

        """
        result: dict[str, Any] = {
            "connectors": [c.to_dict() for c in self.connectors],
            "policies": [p.to_dict() for p in self.policies],
        }
        if self.compression is not None:
            result["compression"] = self.compression.to_dict()
        return result

to_dict

to_dict() -> dict[str, Any]

Serialize to eRPC config dict.

Returns:

Type Description
dict[str, Any]

Dict suitable for inclusion in eRPC YAML under

dict[str, Any]

the database key.

Source code in erpc/database.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to eRPC config dict.

    Returns:
        Dict suitable for inclusion in eRPC YAML under
        the ``database`` key.

    """
    result: dict[str, Any] = {
        "connectors": [c.to_dict() for c in self.connectors],
        "policies": [p.to_dict() for p in self.policies],
    }
    if self.compression is not None:
        result["compression"] = self.compression.to_dict()
    return result