From f9800e9620db7b7452279d1487977926f01b2233 Mon Sep 17 00:00:00 2001 From: Gabor Szabo Date: Wed, 27 Nov 2024 15:58:36 +0100 Subject: [PATCH] Add Certificate types for staking and voting delegation --- pycardano/certificate.py | 242 +++++++++++++++++++++++++++++++++++++++ pycardano/txbuilder.py | 34 +++++- 2 files changed, 274 insertions(+), 2 deletions(-) diff --git a/pycardano/certificate.py b/pycardano/certificate.py index c25ba0c2..67bac613 100644 --- a/pycardano/certificate.py +++ b/pycardano/certificate.py @@ -1,6 +1,7 @@ from __future__ import annotations from dataclasses import dataclass, field +from enum import Enum, unique from typing import Optional, Tuple, Type, Union from pycardano.exception import DeserializeException @@ -15,6 +16,15 @@ "StakeDelegation", "PoolRegistration", "PoolRetirement", + "StakeRegistrationConway", + "StakeDeregistrationConway", + "VoteDelegation", + "StakeAndVoteDelegation", + "StakeRegistrationAndDelegation", + "StakeRegistrationAndVoteDelegation", + "StakeRegistrationAndDelegationAndVoteDelegation", + "DRep", + "DRepKind", ] from pycardano.pool_params import PoolParams @@ -168,10 +178,242 @@ def from_primitive( raise DeserializeException(f"Invalid PoolRetirement type {values[0]}") +@dataclass(repr=False) +class StakeRegistrationConway(ArrayCBORSerializable): + _CODE: int = field(init=False, default=7) + + stake_credential: StakeCredential + coin: int + + def __post_init__(self): + self._CODE = 7 + + @classmethod + @limit_primitive_type(list) + def from_primitive( + cls: Type[StakeRegistrationConway], values: Union[list, tuple] + ) -> StakeRegistrationConway: + if values[0] == 7: + return cls( + stake_credential=StakeCredential.from_primitive(values[1]), + coin=values[2], + ) + else: + raise DeserializeException( + f"Invalid StakeRegistrationConway type {values[0]}" + ) + + +@dataclass(repr=False) +class StakeDeregistrationConway(ArrayCBORSerializable): + _CODE: int = field(init=False, default=8) + + stake_credential: StakeCredential + coin: int + + def __post_init__(self): + self._CODE = 8 + + @classmethod + @limit_primitive_type(list) + def from_primitive( + cls: Type[StakeDeregistrationConway], values: Union[list, tuple] + ) -> StakeDeregistrationConway: + if values[0] == 8: + return cls( + stake_credential=StakeCredential.from_primitive(values[1]), + coin=values[2], + ) + else: + raise DeserializeException( + f"Invalid StakeDeregistrationConway type {values[0]}" + ) + + +@dataclass(repr=False) +class VoteDelegation(ArrayCBORSerializable): + _CODE: int = field(init=False, default=9) + + stake_credential: StakeCredential + drep: DRep + + def __post_init__(self): + self._CODE = 9 + + @classmethod + @limit_primitive_type(list) + def from_primitive( + cls: Type[VoteDelegation], values: Union[list, tuple] + ) -> VoteDelegation: + if values[0] == 9: + return cls( + stake_credential=StakeCredential.from_primitive(values[1]), + drep=DRep.from_primitive(values[2]), + ) + else: + raise DeserializeException(f"Invalid VoteDelegation type {values[0]}") + + +@dataclass(repr=False) +class StakeAndVoteDelegation(ArrayCBORSerializable): + _CODE: int = field(init=False, default=10) + + stake_credential: StakeCredential + pool_keyhash: PoolKeyHash + drep: DRep + + def __post_init__(self): + self._CODE = 10 + + @classmethod + @limit_primitive_type(list) + def from_primitive( + cls: Type[StakeAndVoteDelegation], values: Union[list, tuple] + ) -> StakeAndVoteDelegation: + if values[0] == 10: + return cls( + stake_credential=StakeCredential.from_primitive(values[1]), + pool_keyhash=PoolKeyHash.from_primitive(values[2]), + drep=DRep.from_primitive(values[3]), + ) + else: + raise DeserializeException( + f"Invalid StakeAndVoteDelegation type {values[0]}" + ) + + +@dataclass(repr=False) +class StakeRegistrationAndDelegation(ArrayCBORSerializable): + _CODE: int = field(init=False, default=11) + + stake_credential: StakeCredential + pool_keyhash: PoolKeyHash + coin: int + + def __post_init__(self): + self._CODE = 11 + + @classmethod + @limit_primitive_type(list) + def from_primitive( + cls: Type[StakeRegistrationAndDelegation], values: Union[list, tuple] + ) -> StakeRegistrationAndDelegation: + if values[0] == 11: + return cls( + stake_credential=StakeCredential.from_primitive(values[1]), + pool_keyhash=PoolKeyHash.from_primitive(values[2]), + coin=values[3], + ) + else: + raise DeserializeException(f"Invalid {cls.__name__} type {values[0]}") + + +@dataclass(repr=False) +class StakeRegistrationAndVoteDelegation(ArrayCBORSerializable): + _CODE: int = field(init=False, default=12) + + stake_credential: StakeCredential + drep: DRep + coin: int + + def __post_init__(self): + self._CODE = 12 + + @classmethod + @limit_primitive_type(list) + def from_primitive( + cls: Type[StakeRegistrationAndVoteDelegation], values: Union[list, tuple] + ) -> StakeRegistrationAndVoteDelegation: + if values[0] == 12: + return cls( + stake_credential=StakeCredential.from_primitive(values[1]), + drep=DRep.from_primitive(values[2]), + coin=values[3], + ) + else: + raise DeserializeException(f"Invalid {cls.__name__} type {values[0]}") + + +@dataclass(repr=False) +class StakeRegistrationAndDelegationAndVoteDelegation(ArrayCBORSerializable): + _CODE: int = field(init=False, default=13) + + stake_credential: StakeCredential + pool_keyhash: PoolKeyHash + drep: DRep + coin: int + + def __post_init__(self): + self._CODE = 13 + + @classmethod + @limit_primitive_type(list) + def from_primitive( + cls: Type[StakeRegistrationAndDelegationAndVoteDelegation], + values: Union[list, tuple], + ) -> StakeRegistrationAndDelegationAndVoteDelegation: + if values[0] == 13: + return cls( + stake_credential=StakeCredential.from_primitive(values[1]), + pool_keyhash=PoolKeyHash.from_primitive(values[2]), + drep=DRep.from_primitive(values[3]), + coin=values[4], + ) + else: + raise DeserializeException(f"Invalid {cls.__name__} type {values[0]}") + + +@unique +class DRepKind(Enum): + VERIFICATION_KEY_HASH = 0 + SCRIPT_HASH = 1 + ALWAYS_ABSTAIN = 2 + ALWAYS_NO_CONFIDENCE = 3 + + +@dataclass(repr=False) +class DRep(ArrayCBORSerializable): + kind: DRepKind + credential: Optional[Union[VerificationKeyHash, ScriptHash]] = field( + default=None, metadata={"optional": True} + ) + + @classmethod + @limit_primitive_type(list) + def from_primitive(cls: Type[DRep], values: Union[list, tuple]) -> DRep: + try: + kind = DRepKind(values[0]) + except ValueError: + raise DeserializeException(f"Invalid DRep type {values[0]}") + + if kind == DRepKind.VERIFICATION_KEY_HASH: + return cls(kind=kind, credential=VerificationKeyHash(values[1])) + elif kind == DRepKind.SCRIPT_HASH: + return cls(kind=kind, credential=ScriptHash(values[1])) + elif kind == DRepKind.ALWAYS_ABSTAIN: + return cls(kind=kind) + elif kind == DRepKind.ALWAYS_NO_CONFIDENCE: + return cls(kind=kind) + else: + raise DeserializeException(f"Invalid DRep type {values[0]}") + + def to_primitive(self): + if self.credential is not None: + return [self.kind.value, self.credential.to_primitive()] + return [self.kind.value] + + Certificate = Union[ StakeRegistration, StakeDeregistration, StakeDelegation, PoolRegistration, PoolRetirement, + StakeRegistrationConway, + StakeDeregistrationConway, + VoteDelegation, + StakeAndVoteDelegation, + StakeRegistrationAndDelegation, + StakeRegistrationAndVoteDelegation, + StakeRegistrationAndDelegationAndVoteDelegation, ] diff --git a/pycardano/txbuilder.py b/pycardano/txbuilder.py index f1f2ff50..5d1d3fb7 100644 --- a/pycardano/txbuilder.py +++ b/pycardano/txbuilder.py @@ -11,10 +11,17 @@ Certificate, PoolRegistration, PoolRetirement, + StakeAndVoteDelegation, StakeCredential, StakeDelegation, StakeDeregistration, + StakeDeregistrationConway, StakeRegistration, + StakeRegistrationAndDelegation, + StakeRegistrationAndDelegationAndVoteDelegation, + StakeRegistrationAndVoteDelegation, + StakeRegistrationConway, + VoteDelegation, ) from pycardano.coinselection import ( LargestFirstSelector, @@ -865,7 +872,19 @@ def _check_and_add_vkey(stake_credential: StakeCredential): if self.certificates: for cert in self.certificates: if isinstance( - cert, (StakeRegistration, StakeDeregistration, StakeDelegation) + cert, + ( + StakeRegistration, + StakeDeregistration, + StakeDelegation, + StakeRegistrationConway, + StakeDeregistrationConway, + VoteDelegation, + StakeAndVoteDelegation, + StakeRegistrationAndDelegation, + StakeRegistrationAndVoteDelegation, + StakeRegistrationAndDelegationAndVoteDelegation, + ), ): _check_and_add_vkey(cert.stake_credential) elif isinstance(cert, PoolRegistration): @@ -876,6 +895,7 @@ def _check_and_add_vkey(stake_credential: StakeCredential): def _get_total_key_deposit(self): stake_registration_certs = set() + stake_registration_certs_with_explicit_deposit = set() stake_pool_registration_certs = set() protocol_params = self.context.protocol_param @@ -884,6 +904,16 @@ def _get_total_key_deposit(self): for cert in self.certificates: if isinstance(cert, StakeRegistration): stake_registration_certs.add(cert.stake_credential.credential) + elif isinstance( + cert, + ( + StakeRegistrationConway, + StakeRegistrationAndDelegation, + StakeRegistrationAndVoteDelegation, + StakeRegistrationAndDelegationAndVoteDelegation, + ), + ): + stake_registration_certs_with_explicit_deposit.add(cert.coin) elif ( isinstance(cert, PoolRegistration) and self.initial_stake_pool_registration @@ -892,7 +922,7 @@ def _get_total_key_deposit(self): stake_registration_deposit = protocol_params.key_deposit * len( stake_registration_certs - ) + ) + sum(stake_registration_certs_with_explicit_deposit) stake_pool_registration_deposit = protocol_params.pool_deposit * len( stake_pool_registration_certs )