Skip to content

Federation API

Low level api

__all__ special

Classes

FederationABC

federation, get or remote objects and tables

Source code in fate_arch/abc/_federation.py
class FederationABC(metaclass=ABCMeta):
    """
    federation, get or remote objects and tables
    """

    @abc.abstractmethod
    def get(self, name: str,
            tag: str,
            parties: typing.List[Party],
            gc: GarbageCollectionABC) -> typing.List:
        """
        get objects/tables from ``parties``

        Parameters
        ----------
        name: str
           name of transfer variable
        tag: str
           tag to distinguish each transfer
        parties: typing.List[Party]
           parties to get objects/tables from
        gc: GarbageCollectionABC
           used to do some clean jobs

        Returns
        -------
        list
           a list of object or a list of table get from parties with same order of `parties`

        """
        ...

    @abc.abstractmethod
    def remote(self, v,
               name: str,
               tag: str,
               parties: typing.List[Party],
               gc: GarbageCollectionABC) -> typing.NoReturn:
        """
        remote object/table to ``parties``

        Parameters
        ----------
        v: object or table
           object/table to remote
        name: str
           name of transfer variable
        tag: str
           tag to distinguish each transfer
        parties: typing.List[Party]
           parties to remote object/table to
        gc: GarbageCollectionABC
           used to do some clean jobs

        Returns
        -------
        Notes
        """
        ...

Methods

get(self, name, tag, parties, gc)

get objects/tables from parties

Parameters

name: str name of transfer variable tag: str tag to distinguish each transfer parties: typing.List[Party] parties to get objects/tables from gc: GarbageCollectionABC used to do some clean jobs

Returns

list a list of object or a list of table get from parties with same order of parties

Source code in fate_arch/abc/_federation.py
@abc.abstractmethod
def get(self, name: str,
        tag: str,
        parties: typing.List[Party],
        gc: GarbageCollectionABC) -> typing.List:
    """
    get objects/tables from ``parties``

    Parameters
    ----------
    name: str
       name of transfer variable
    tag: str
       tag to distinguish each transfer
    parties: typing.List[Party]
       parties to get objects/tables from
    gc: GarbageCollectionABC
       used to do some clean jobs

    Returns
    -------
    list
       a list of object or a list of table get from parties with same order of `parties`

    """
    ...
remote(self, v, name, tag, parties, gc)

remote object/table to parties

Parameters

v: object or table object/table to remote name: str name of transfer variable tag: str tag to distinguish each transfer parties: typing.List[Party] parties to remote object/table to gc: GarbageCollectionABC used to do some clean jobs

Returns

Notes

Source code in fate_arch/abc/_federation.py
@abc.abstractmethod
def remote(self, v,
           name: str,
           tag: str,
           parties: typing.List[Party],
           gc: GarbageCollectionABC) -> typing.NoReturn:
    """
    remote object/table to ``parties``

    Parameters
    ----------
    v: object or table
       object/table to remote
    name: str
       name of transfer variable
    tag: str
       tag to distinguish each transfer
    parties: typing.List[Party]
       parties to remote object/table to
    gc: GarbageCollectionABC
       used to do some clean jobs

    Returns
    -------
    Notes
    """
    ...

user api

remoting or getting an object(table) from other parties is quite easy using apis provided in Variable. First to create an instance of BaseTransferVariable, which is simply a collection of Variables:

from federatedml.transfer_variable.transfer_class import secure_add_example_transfer_variable
variable = secure_add_example_transfer_variable.SecureAddExampleTransferVariable()

Then remote or get object(table) by variable provided by this instance:

# remote
variable.guest_share.remote("from guest")

# get
variable.guest_share.get()

LOGGER

__all__ special

Classes

FederationTagNamespace

Source code in fate_arch/federation/transfer_variable.py
class FederationTagNamespace(object):
    __namespace = "default"

    @classmethod
    def set_namespace(cls, namespace):
        cls.__namespace = namespace

    @classmethod
    def generate_tag(cls, *suffix):
        tags = (cls.__namespace, *map(str, suffix))
        return ".".join(tags)
set_namespace(namespace) classmethod
Source code in fate_arch/federation/transfer_variable.py
@classmethod
def set_namespace(cls, namespace):
    cls.__namespace = namespace
generate_tag(*suffix) classmethod
Source code in fate_arch/federation/transfer_variable.py
@classmethod
def generate_tag(cls, *suffix):
    tags = (cls.__namespace, *map(str, suffix))
    return ".".join(tags)

Variable

variable to distinguish federation by name

Source code in fate_arch/federation/transfer_variable.py
class Variable(object):
    """
    variable to distinguish federation by name
    """

    __instances: typing.MutableMapping[str, "Variable"] = {}

    @classmethod
    def get_or_create(
        cls, name, create_func: typing.Callable[[], "Variable"]
    ) -> "Variable":
        if name not in cls.__instances:
            value = create_func()
            cls.__instances[name] = value
        return cls.__instances[name]

    def __init__(
        self, name: str, src: typing.Tuple[str, ...], dst: typing.Tuple[str, ...]
    ):

        if name in self.__instances:
            raise RuntimeError(
                f"{self.__instances[name]} with {name} already initialized, which expected to be an singleton object."
            )

        assert (
            len(name.split(".")) >= 3
        ), "incorrect name format, should be `module_name.class_name.variable_name`"
        self._name = name
        self._src = src
        self._dst = dst
        self._get_gc = IterationGC()
        self._remote_gc = IterationGC()
        self._use_short_name = True
        self._short_name = self._get_short_name(self._name)

    @staticmethod
    def _get_short_name(name):
        fix_sized = hashlib.blake2b(name.encode("utf-8"), digest_size=10).hexdigest()
        _, right = name.rsplit(".", 1)
        return f"hash.{fix_sized}.{right}"

    # copy never create a new instance
    def __copy__(self):
        return self

    # deepcopy never create a new instance
    def __deepcopy__(self, memo):
        return self

    def set_preserve_num(self, n):
        self._get_gc.set_capacity(n)
        self._remote_gc.set_capacity(n)
        return self

    def disable_auto_clean(self):
        self._get_gc.disable()
        self._remote_gc.disable()
        return self

    def clean(self):
        self._get_gc.clean()
        self._remote_gc.clean()

    def remote_parties(
        self,
        obj,
        parties: Union[typing.List[Party], Party],
        suffix: Union[typing.Any, typing.Tuple] = tuple(),
    ):
        """
        remote object to specified parties

        Parameters
        ----------
        obj: object or table
           object or table to remote
        parties: typing.List[Party]
           parties to remote object/table to
        suffix: str or tuple of str
           suffix used to distinguish federation with in variable

        Returns
        -------
        None
        """
        from fate_arch.session import get_session

        session = get_session()
        if isinstance(parties, Party):
            parties = [parties]
        if not isinstance(suffix, tuple):
            suffix = (suffix,)
        tag = FederationTagNamespace.generate_tag(*suffix)

        for party in parties:
            if party.role not in self._dst:
                raise RuntimeError(
                    f"not allowed to remote object to {party} using {self._name}"
                )
        local = session.parties.local_party.role
        if local not in self._src:
            raise RuntimeError(
                f"not allowed to remote object from {local} using {self._name}"
            )

        name = self._short_name if self._use_short_name else self._name

        timer = profile.federation_remote_timer(name, self._name, tag, local, parties)
        session.federation.remote(
            v=obj, name=name, tag=tag, parties=parties, gc=self._remote_gc
        )
        timer.done(session.federation)

        self._remote_gc.gc()

    def get_parties(
        self,
        parties: Union[typing.List[Party], Party],
        suffix: Union[typing.Any, typing.Tuple] = tuple(),
    ):
        """
        get objects/tables from specified parties

        Parameters
        ----------
        parties: typing.List[Party]
           parties to remote object/table to
        suffix: str or tuple of str
           suffix used to distinguish federation with in variable

        Returns
        -------
        list
           a list of objects/tables get from parties with same order of ``parties``

        """
        from fate_arch.session import get_session

        session = get_session()
        if not isinstance(parties, list):
            parties = [parties]
        if not isinstance(suffix, tuple):
            suffix = (suffix,)
        tag = FederationTagNamespace.generate_tag(*suffix)

        for party in parties:
            if party.role not in self._src:
                raise RuntimeError(
                    f"not allowed to get object from {party} using {self._name}"
                )
        local = session.parties.local_party.role
        if local not in self._dst:
            raise RuntimeError(
                f"not allowed to get object to {local} using {self._name}"
            )

        name = self._short_name if self._use_short_name else self._name
        timer = profile.federation_get_timer(name, self._name, tag, local, parties)
        rtn = session.federation.get(
            name=name, tag=tag, parties=parties, gc=self._get_gc
        )
        timer.done(session.federation)

        self._get_gc.gc()

        return rtn

    def remote(self, obj, role=None, idx=-1, suffix=tuple()):
        """
        send obj to other parties.

        Args:
            obj: object to be sent
            role: role of parties to sent to, use one of ['Host', 'Guest', 'Arbiter', None].
                The default is None, means sent values to parties regardless their party role
            idx: id of party to sent to.
                The default is -1, which means sent values to parties regardless their party id
            suffix: additional tag suffix, the default is tuple()
        """
        from fate_arch.session import get_parties

        party_info = get_parties()
        if idx >= 0 and role is None:
            raise ValueError("role cannot be None if idx specified")

        # get subset of dst roles in runtime conf
        if role is None:
            parties = party_info.roles_to_parties(self._dst, strict=False)
        else:
            if isinstance(role, str):
                role = [role]
            parties = party_info.roles_to_parties(role)

        if idx >= 0:
            if idx >= len(parties):
                raise RuntimeError(
                    f"try to remote to {idx}th party while only {len(parties)} configurated: {parties}, check {self._name}"
                )
            parties = parties[idx]
        return self.remote_parties(obj=obj, parties=parties, suffix=suffix)

    def get(self, idx=-1, role=None, suffix=tuple()):
        """
        get obj from other parties.

        Args:
            idx: id of party to get from.
                The default is -1, which means get values from parties regardless their party id
            suffix: additional tag suffix, the default is tuple()

        Returns:
            object or list of object
        """
        from fate_arch.session import get_parties

        if role is None:
            src_parties = get_parties().roles_to_parties(roles=self._src, strict=False)
        else:
            if isinstance(role, str):
                role = [role]
            src_parties = get_parties().roles_to_parties(roles=role, strict=False)
        if isinstance(idx, list):
            rtn = self.get_parties(parties=[src_parties[i] for i in idx], suffix=suffix)
        elif isinstance(idx, int):
            if idx < 0:
                rtn = self.get_parties(parties=src_parties, suffix=suffix)
            else:
                if idx >= len(src_parties):
                    raise RuntimeError(
                        f"try to get from {idx}th party while only {len(src_parties)} configurated: {src_parties}, check {self._name}"
                    )
                rtn = self.get_parties(parties=src_parties[idx], suffix=suffix)[0]
        else:
            raise ValueError(
                f"illegal idx type: {type(idx)}, supported types: int or list of int"
            )
        return rtn

Methods

get_or_create(name, create_func) classmethod
Source code in fate_arch/federation/transfer_variable.py
@classmethod
def get_or_create(
    cls, name, create_func: typing.Callable[[], "Variable"]
) -> "Variable":
    if name not in cls.__instances:
        value = create_func()
        cls.__instances[name] = value
    return cls.__instances[name]
__init__(self, name, src, dst) special
Source code in fate_arch/federation/transfer_variable.py
def __init__(
    self, name: str, src: typing.Tuple[str, ...], dst: typing.Tuple[str, ...]
):

    if name in self.__instances:
        raise RuntimeError(
            f"{self.__instances[name]} with {name} already initialized, which expected to be an singleton object."
        )

    assert (
        len(name.split(".")) >= 3
    ), "incorrect name format, should be `module_name.class_name.variable_name`"
    self._name = name
    self._src = src
    self._dst = dst
    self._get_gc = IterationGC()
    self._remote_gc = IterationGC()
    self._use_short_name = True
    self._short_name = self._get_short_name(self._name)
__copy__(self) special
Source code in fate_arch/federation/transfer_variable.py
def __copy__(self):
    return self
__deepcopy__(self, memo) special
Source code in fate_arch/federation/transfer_variable.py
def __deepcopy__(self, memo):
    return self
set_preserve_num(self, n)
Source code in fate_arch/federation/transfer_variable.py
def set_preserve_num(self, n):
    self._get_gc.set_capacity(n)
    self._remote_gc.set_capacity(n)
    return self
disable_auto_clean(self)
Source code in fate_arch/federation/transfer_variable.py
def disable_auto_clean(self):
    self._get_gc.disable()
    self._remote_gc.disable()
    return self
clean(self)
Source code in fate_arch/federation/transfer_variable.py
def clean(self):
    self._get_gc.clean()
    self._remote_gc.clean()
remote_parties(self, obj, parties, suffix=())

remote object to specified parties

Parameters

obj: object or table object or table to remote parties: typing.List[Party] parties to remote object/table to suffix: str or tuple of str suffix used to distinguish federation with in variable

Returns

None

Source code in fate_arch/federation/transfer_variable.py
def remote_parties(
    self,
    obj,
    parties: Union[typing.List[Party], Party],
    suffix: Union[typing.Any, typing.Tuple] = tuple(),
):
    """
    remote object to specified parties

    Parameters
    ----------
    obj: object or table
       object or table to remote
    parties: typing.List[Party]
       parties to remote object/table to
    suffix: str or tuple of str
       suffix used to distinguish federation with in variable

    Returns
    -------
    None
    """
    from fate_arch.session import get_session

    session = get_session()
    if isinstance(parties, Party):
        parties = [parties]
    if not isinstance(suffix, tuple):
        suffix = (suffix,)
    tag = FederationTagNamespace.generate_tag(*suffix)

    for party in parties:
        if party.role not in self._dst:
            raise RuntimeError(
                f"not allowed to remote object to {party} using {self._name}"
            )
    local = session.parties.local_party.role
    if local not in self._src:
        raise RuntimeError(
            f"not allowed to remote object from {local} using {self._name}"
        )

    name = self._short_name if self._use_short_name else self._name

    timer = profile.federation_remote_timer(name, self._name, tag, local, parties)
    session.federation.remote(
        v=obj, name=name, tag=tag, parties=parties, gc=self._remote_gc
    )
    timer.done(session.federation)

    self._remote_gc.gc()
get_parties(self, parties, suffix=())

get objects/tables from specified parties

Parameters

parties: typing.List[Party] parties to remote object/table to suffix: str or tuple of str suffix used to distinguish federation with in variable

Returns

list a list of objects/tables get from parties with same order of parties

Source code in fate_arch/federation/transfer_variable.py
def get_parties(
    self,
    parties: Union[typing.List[Party], Party],
    suffix: Union[typing.Any, typing.Tuple] = tuple(),
):
    """
    get objects/tables from specified parties

    Parameters
    ----------
    parties: typing.List[Party]
       parties to remote object/table to
    suffix: str or tuple of str
       suffix used to distinguish federation with in variable

    Returns
    -------
    list
       a list of objects/tables get from parties with same order of ``parties``

    """
    from fate_arch.session import get_session

    session = get_session()
    if not isinstance(parties, list):
        parties = [parties]
    if not isinstance(suffix, tuple):
        suffix = (suffix,)
    tag = FederationTagNamespace.generate_tag(*suffix)

    for party in parties:
        if party.role not in self._src:
            raise RuntimeError(
                f"not allowed to get object from {party} using {self._name}"
            )
    local = session.parties.local_party.role
    if local not in self._dst:
        raise RuntimeError(
            f"not allowed to get object to {local} using {self._name}"
        )

    name = self._short_name if self._use_short_name else self._name
    timer = profile.federation_get_timer(name, self._name, tag, local, parties)
    rtn = session.federation.get(
        name=name, tag=tag, parties=parties, gc=self._get_gc
    )
    timer.done(session.federation)

    self._get_gc.gc()

    return rtn
remote(self, obj, role=None, idx=-1, suffix=())

send obj to other parties.

Parameters:

Name Type Description Default
obj

object to be sent

required
role

role of parties to sent to, use one of ['Host', 'Guest', 'Arbiter', None]. The default is None, means sent values to parties regardless their party role

None
idx

id of party to sent to. The default is -1, which means sent values to parties regardless their party id

-1
suffix

additional tag suffix, the default is tuple()

()
Source code in fate_arch/federation/transfer_variable.py
def remote(self, obj, role=None, idx=-1, suffix=tuple()):
    """
    send obj to other parties.

    Args:
        obj: object to be sent
        role: role of parties to sent to, use one of ['Host', 'Guest', 'Arbiter', None].
            The default is None, means sent values to parties regardless their party role
        idx: id of party to sent to.
            The default is -1, which means sent values to parties regardless their party id
        suffix: additional tag suffix, the default is tuple()
    """
    from fate_arch.session import get_parties

    party_info = get_parties()
    if idx >= 0 and role is None:
        raise ValueError("role cannot be None if idx specified")

    # get subset of dst roles in runtime conf
    if role is None:
        parties = party_info.roles_to_parties(self._dst, strict=False)
    else:
        if isinstance(role, str):
            role = [role]
        parties = party_info.roles_to_parties(role)

    if idx >= 0:
        if idx >= len(parties):
            raise RuntimeError(
                f"try to remote to {idx}th party while only {len(parties)} configurated: {parties}, check {self._name}"
            )
        parties = parties[idx]
    return self.remote_parties(obj=obj, parties=parties, suffix=suffix)
get(self, idx=-1, role=None, suffix=())

get obj from other parties.

Parameters:

Name Type Description Default
idx

id of party to get from. The default is -1, which means get values from parties regardless their party id

-1
suffix

additional tag suffix, the default is tuple()

()

Returns:

Type Description

object or list of object

Source code in fate_arch/federation/transfer_variable.py
def get(self, idx=-1, role=None, suffix=tuple()):
    """
    get obj from other parties.

    Args:
        idx: id of party to get from.
            The default is -1, which means get values from parties regardless their party id
        suffix: additional tag suffix, the default is tuple()

    Returns:
        object or list of object
    """
    from fate_arch.session import get_parties

    if role is None:
        src_parties = get_parties().roles_to_parties(roles=self._src, strict=False)
    else:
        if isinstance(role, str):
            role = [role]
        src_parties = get_parties().roles_to_parties(roles=role, strict=False)
    if isinstance(idx, list):
        rtn = self.get_parties(parties=[src_parties[i] for i in idx], suffix=suffix)
    elif isinstance(idx, int):
        if idx < 0:
            rtn = self.get_parties(parties=src_parties, suffix=suffix)
        else:
            if idx >= len(src_parties):
                raise RuntimeError(
                    f"try to get from {idx}th party while only {len(src_parties)} configurated: {src_parties}, check {self._name}"
                )
            rtn = self.get_parties(parties=src_parties[idx], suffix=suffix)[0]
    else:
        raise ValueError(
            f"illegal idx type: {type(idx)}, supported types: int or list of int"
        )
    return rtn

BaseTransferVariables

Source code in fate_arch/federation/transfer_variable.py
class BaseTransferVariables(object):
    def __init__(self, *args):
        pass

    def __copy__(self):
        return self

    def __deepcopy__(self, memo):
        return self

    @staticmethod
    def set_flowid(flowid):
        """
        set global namespace for federations.

        Parameters
        ----------
        flowid: str
           namespace

        Returns
        -------
        None

        """
        FederationTagNamespace.set_namespace(str(flowid))

    def _create_variable(
        self, name: str, src: typing.Iterable[str], dst: typing.Iterable[str]
    ) -> Variable:
        full_name = f"{self.__module__}.{self.__class__.__name__}.{name}"
        return Variable.get_or_create(
            full_name, lambda: Variable(name=full_name, src=tuple(src), dst=tuple(dst))
        )

    @staticmethod
    def all_parties():
        """
        get all parties

        Returns
        -------
        list
           list of parties

        """
        from fate_arch.session import get_parties

        return get_parties().all_parties

    @staticmethod
    def local_party():
        """
        indicate local party

        Returns
        -------
        Party
           party this program running on

        """
        from fate_arch.session import get_parties

        return get_parties().local_party

Methods

__init__(self, *args) special
Source code in fate_arch/federation/transfer_variable.py
def __init__(self, *args):
    pass
__copy__(self) special
Source code in fate_arch/federation/transfer_variable.py
def __copy__(self):
    return self
__deepcopy__(self, memo) special
Source code in fate_arch/federation/transfer_variable.py
def __deepcopy__(self, memo):
    return self
set_flowid(flowid) staticmethod

set global namespace for federations.

Parameters

flowid: str namespace

Returns

None

Source code in fate_arch/federation/transfer_variable.py
@staticmethod
def set_flowid(flowid):
    """
    set global namespace for federations.

    Parameters
    ----------
    flowid: str
       namespace

    Returns
    -------
    None

    """
    FederationTagNamespace.set_namespace(str(flowid))
all_parties() staticmethod

get all parties

Returns

list list of parties

Source code in fate_arch/federation/transfer_variable.py
@staticmethod
def all_parties():
    """
    get all parties

    Returns
    -------
    list
       list of parties

    """
    from fate_arch.session import get_parties

    return get_parties().all_parties
local_party() staticmethod

indicate local party

Returns

Party party this program running on

Source code in fate_arch/federation/transfer_variable.py
@staticmethod
def local_party():
    """
    indicate local party

    Returns
    -------
    Party
       party this program running on

    """
    from fate_arch.session import get_parties

    return get_parties().local_party

Last update: 2021-11-15