Federation API¶
Low level api¶
__all__
special
¶
Classes¶
FederationABC
¶
federation, get or remote objects and tables
Source code in fate_arch/abc/_federation.py
class FederationABC(metaclass=ABCMeta):
"""
federation, get or remote objects and tables
"""
@abc.abstractmethod
def get(self, name: str,
tag: str,
parties: typing.List[Party],
gc: GarbageCollectionABC) -> typing.List:
"""
get objects/tables from ``parties``
Parameters
----------
name: str
name of transfer variable
tag: str
tag to distinguish each transfer
parties: typing.List[Party]
parties to get objects/tables from
gc: GarbageCollectionABC
used to do some clean jobs
Returns
-------
list
a list of object or a list of table get from parties with same order of `parties`
"""
...
@abc.abstractmethod
def remote(self, v,
name: str,
tag: str,
parties: typing.List[Party],
gc: GarbageCollectionABC) -> typing.NoReturn:
"""
remote object/table to ``parties``
Parameters
----------
v: object or table
object/table to remote
name: str
name of transfer variable
tag: str
tag to distinguish each transfer
parties: typing.List[Party]
parties to remote object/table to
gc: GarbageCollectionABC
used to do some clean jobs
Returns
-------
Notes
"""
...
Methods¶
get(self, name, tag, parties, gc)
¶
get objects/tables from parties
Parameters¶
name: str name of transfer variable tag: str tag to distinguish each transfer parties: typing.List[Party] parties to get objects/tables from gc: GarbageCollectionABC used to do some clean jobs
Returns¶
list
a list of object or a list of table get from parties with same order of parties
Source code in fate_arch/abc/_federation.py
@abc.abstractmethod
def get(self, name: str,
tag: str,
parties: typing.List[Party],
gc: GarbageCollectionABC) -> typing.List:
"""
get objects/tables from ``parties``
Parameters
----------
name: str
name of transfer variable
tag: str
tag to distinguish each transfer
parties: typing.List[Party]
parties to get objects/tables from
gc: GarbageCollectionABC
used to do some clean jobs
Returns
-------
list
a list of object or a list of table get from parties with same order of `parties`
"""
...
remote(self, v, name, tag, parties, gc)
¶
remote object/table to parties
Parameters¶
v: object or table object/table to remote name: str name of transfer variable tag: str tag to distinguish each transfer parties: typing.List[Party] parties to remote object/table to gc: GarbageCollectionABC used to do some clean jobs
Returns¶
Notes
Source code in fate_arch/abc/_federation.py
@abc.abstractmethod
def remote(self, v,
name: str,
tag: str,
parties: typing.List[Party],
gc: GarbageCollectionABC) -> typing.NoReturn:
"""
remote object/table to ``parties``
Parameters
----------
v: object or table
object/table to remote
name: str
name of transfer variable
tag: str
tag to distinguish each transfer
parties: typing.List[Party]
parties to remote object/table to
gc: GarbageCollectionABC
used to do some clean jobs
Returns
-------
Notes
"""
...
user api¶
remoting or getting an object(table) from other parties is quite easy using apis provided in Variable
.
First to create an instance of BaseTransferVariable, which is simply a collection of Variables:
from federatedml.transfer_variable.transfer_class import secure_add_example_transfer_variable
variable = secure_add_example_transfer_variable.SecureAddExampleTransferVariable()
Then remote or get object(table) by variable provided by this instance:
# remote
variable.guest_share.remote("from guest")
# get
variable.guest_share.get()
LOGGER
¶
__all__
special
¶
Classes¶
FederationTagNamespace
¶
Source code in fate_arch/federation/transfer_variable.py
class FederationTagNamespace(object):
__namespace = "default"
@classmethod
def set_namespace(cls, namespace):
cls.__namespace = namespace
@classmethod
def generate_tag(cls, *suffix):
tags = (cls.__namespace, *map(str, suffix))
return ".".join(tags)
set_namespace(namespace)
classmethod
¶
Source code in fate_arch/federation/transfer_variable.py
@classmethod
def set_namespace(cls, namespace):
cls.__namespace = namespace
generate_tag(*suffix)
classmethod
¶
Source code in fate_arch/federation/transfer_variable.py
@classmethod
def generate_tag(cls, *suffix):
tags = (cls.__namespace, *map(str, suffix))
return ".".join(tags)
Variable
¶
variable to distinguish federation by name
Source code in fate_arch/federation/transfer_variable.py
class Variable(object):
"""
variable to distinguish federation by name
"""
__instances: typing.MutableMapping[str, "Variable"] = {}
@classmethod
def get_or_create(
cls, name, create_func: typing.Callable[[], "Variable"]
) -> "Variable":
if name not in cls.__instances:
value = create_func()
cls.__instances[name] = value
return cls.__instances[name]
def __init__(
self, name: str, src: typing.Tuple[str, ...], dst: typing.Tuple[str, ...]
):
if name in self.__instances:
raise RuntimeError(
f"{self.__instances[name]} with {name} already initialized, which expected to be an singleton object."
)
assert (
len(name.split(".")) >= 3
), "incorrect name format, should be `module_name.class_name.variable_name`"
self._name = name
self._src = src
self._dst = dst
self._get_gc = IterationGC()
self._remote_gc = IterationGC()
self._use_short_name = True
self._short_name = self._get_short_name(self._name)
@staticmethod
def _get_short_name(name):
fix_sized = hashlib.blake2b(name.encode("utf-8"), digest_size=10).hexdigest()
_, right = name.rsplit(".", 1)
return f"hash.{fix_sized}.{right}"
# copy never create a new instance
def __copy__(self):
return self
# deepcopy never create a new instance
def __deepcopy__(self, memo):
return self
def set_preserve_num(self, n):
self._get_gc.set_capacity(n)
self._remote_gc.set_capacity(n)
return self
def disable_auto_clean(self):
self._get_gc.disable()
self._remote_gc.disable()
return self
def clean(self):
self._get_gc.clean()
self._remote_gc.clean()
def remote_parties(
self,
obj,
parties: Union[typing.List[Party], Party],
suffix: Union[typing.Any, typing.Tuple] = tuple(),
):
"""
remote object to specified parties
Parameters
----------
obj: object or table
object or table to remote
parties: typing.List[Party]
parties to remote object/table to
suffix: str or tuple of str
suffix used to distinguish federation with in variable
Returns
-------
None
"""
from fate_arch.session import get_session
session = get_session()
if isinstance(parties, Party):
parties = [parties]
if not isinstance(suffix, tuple):
suffix = (suffix,)
tag = FederationTagNamespace.generate_tag(*suffix)
for party in parties:
if party.role not in self._dst:
raise RuntimeError(
f"not allowed to remote object to {party} using {self._name}"
)
local = session.parties.local_party.role
if local not in self._src:
raise RuntimeError(
f"not allowed to remote object from {local} using {self._name}"
)
name = self._short_name if self._use_short_name else self._name
timer = profile.federation_remote_timer(name, self._name, tag, local, parties)
session.federation.remote(
v=obj, name=name, tag=tag, parties=parties, gc=self._remote_gc
)
timer.done(session.federation)
self._remote_gc.gc()
def get_parties(
self,
parties: Union[typing.List[Party], Party],
suffix: Union[typing.Any, typing.Tuple] = tuple(),
):
"""
get objects/tables from specified parties
Parameters
----------
parties: typing.List[Party]
parties to remote object/table to
suffix: str or tuple of str
suffix used to distinguish federation with in variable
Returns
-------
list
a list of objects/tables get from parties with same order of ``parties``
"""
from fate_arch.session import get_session
session = get_session()
if not isinstance(parties, list):
parties = [parties]
if not isinstance(suffix, tuple):
suffix = (suffix,)
tag = FederationTagNamespace.generate_tag(*suffix)
for party in parties:
if party.role not in self._src:
raise RuntimeError(
f"not allowed to get object from {party} using {self._name}"
)
local = session.parties.local_party.role
if local not in self._dst:
raise RuntimeError(
f"not allowed to get object to {local} using {self._name}"
)
name = self._short_name if self._use_short_name else self._name
timer = profile.federation_get_timer(name, self._name, tag, local, parties)
rtn = session.federation.get(
name=name, tag=tag, parties=parties, gc=self._get_gc
)
timer.done(session.federation)
self._get_gc.gc()
return rtn
def remote(self, obj, role=None, idx=-1, suffix=tuple()):
"""
send obj to other parties.
Args:
obj: object to be sent
role: role of parties to sent to, use one of ['Host', 'Guest', 'Arbiter', None].
The default is None, means sent values to parties regardless their party role
idx: id of party to sent to.
The default is -1, which means sent values to parties regardless their party id
suffix: additional tag suffix, the default is tuple()
"""
from fate_arch.session import get_parties
party_info = get_parties()
if idx >= 0 and role is None:
raise ValueError("role cannot be None if idx specified")
# get subset of dst roles in runtime conf
if role is None:
parties = party_info.roles_to_parties(self._dst, strict=False)
else:
if isinstance(role, str):
role = [role]
parties = party_info.roles_to_parties(role)
if idx >= 0:
if idx >= len(parties):
raise RuntimeError(
f"try to remote to {idx}th party while only {len(parties)} configurated: {parties}, check {self._name}"
)
parties = parties[idx]
return self.remote_parties(obj=obj, parties=parties, suffix=suffix)
def get(self, idx=-1, role=None, suffix=tuple()):
"""
get obj from other parties.
Args:
idx: id of party to get from.
The default is -1, which means get values from parties regardless their party id
suffix: additional tag suffix, the default is tuple()
Returns:
object or list of object
"""
from fate_arch.session import get_parties
if role is None:
src_parties = get_parties().roles_to_parties(roles=self._src, strict=False)
else:
if isinstance(role, str):
role = [role]
src_parties = get_parties().roles_to_parties(roles=role, strict=False)
if isinstance(idx, list):
rtn = self.get_parties(parties=[src_parties[i] for i in idx], suffix=suffix)
elif isinstance(idx, int):
if idx < 0:
rtn = self.get_parties(parties=src_parties, suffix=suffix)
else:
if idx >= len(src_parties):
raise RuntimeError(
f"try to get from {idx}th party while only {len(src_parties)} configurated: {src_parties}, check {self._name}"
)
rtn = self.get_parties(parties=src_parties[idx], suffix=suffix)[0]
else:
raise ValueError(
f"illegal idx type: {type(idx)}, supported types: int or list of int"
)
return rtn
Methods¶
get_or_create(name, create_func)
classmethod
¶
Source code in fate_arch/federation/transfer_variable.py
@classmethod
def get_or_create(
cls, name, create_func: typing.Callable[[], "Variable"]
) -> "Variable":
if name not in cls.__instances:
value = create_func()
cls.__instances[name] = value
return cls.__instances[name]
__init__(self, name, src, dst)
special
¶
Source code in fate_arch/federation/transfer_variable.py
def __init__(
self, name: str, src: typing.Tuple[str, ...], dst: typing.Tuple[str, ...]
):
if name in self.__instances:
raise RuntimeError(
f"{self.__instances[name]} with {name} already initialized, which expected to be an singleton object."
)
assert (
len(name.split(".")) >= 3
), "incorrect name format, should be `module_name.class_name.variable_name`"
self._name = name
self._src = src
self._dst = dst
self._get_gc = IterationGC()
self._remote_gc = IterationGC()
self._use_short_name = True
self._short_name = self._get_short_name(self._name)
__copy__(self)
special
¶
Source code in fate_arch/federation/transfer_variable.py
def __copy__(self):
return self
__deepcopy__(self, memo)
special
¶
Source code in fate_arch/federation/transfer_variable.py
def __deepcopy__(self, memo):
return self
set_preserve_num(self, n)
¶
Source code in fate_arch/federation/transfer_variable.py
def set_preserve_num(self, n):
self._get_gc.set_capacity(n)
self._remote_gc.set_capacity(n)
return self
disable_auto_clean(self)
¶
Source code in fate_arch/federation/transfer_variable.py
def disable_auto_clean(self):
self._get_gc.disable()
self._remote_gc.disable()
return self
clean(self)
¶
Source code in fate_arch/federation/transfer_variable.py
def clean(self):
self._get_gc.clean()
self._remote_gc.clean()
remote_parties(self, obj, parties, suffix=())
¶
remote object to specified parties
Parameters¶
obj: object or table object or table to remote parties: typing.List[Party] parties to remote object/table to suffix: str or tuple of str suffix used to distinguish federation with in variable
Returns¶
None
Source code in fate_arch/federation/transfer_variable.py
def remote_parties(
self,
obj,
parties: Union[typing.List[Party], Party],
suffix: Union[typing.Any, typing.Tuple] = tuple(),
):
"""
remote object to specified parties
Parameters
----------
obj: object or table
object or table to remote
parties: typing.List[Party]
parties to remote object/table to
suffix: str or tuple of str
suffix used to distinguish federation with in variable
Returns
-------
None
"""
from fate_arch.session import get_session
session = get_session()
if isinstance(parties, Party):
parties = [parties]
if not isinstance(suffix, tuple):
suffix = (suffix,)
tag = FederationTagNamespace.generate_tag(*suffix)
for party in parties:
if party.role not in self._dst:
raise RuntimeError(
f"not allowed to remote object to {party} using {self._name}"
)
local = session.parties.local_party.role
if local not in self._src:
raise RuntimeError(
f"not allowed to remote object from {local} using {self._name}"
)
name = self._short_name if self._use_short_name else self._name
timer = profile.federation_remote_timer(name, self._name, tag, local, parties)
session.federation.remote(
v=obj, name=name, tag=tag, parties=parties, gc=self._remote_gc
)
timer.done(session.federation)
self._remote_gc.gc()
get_parties(self, parties, suffix=())
¶
get objects/tables from specified parties
Parameters¶
parties: typing.List[Party] parties to remote object/table to suffix: str or tuple of str suffix used to distinguish federation with in variable
Returns¶
list
a list of objects/tables get from parties with same order of parties
Source code in fate_arch/federation/transfer_variable.py
def get_parties(
self,
parties: Union[typing.List[Party], Party],
suffix: Union[typing.Any, typing.Tuple] = tuple(),
):
"""
get objects/tables from specified parties
Parameters
----------
parties: typing.List[Party]
parties to remote object/table to
suffix: str or tuple of str
suffix used to distinguish federation with in variable
Returns
-------
list
a list of objects/tables get from parties with same order of ``parties``
"""
from fate_arch.session import get_session
session = get_session()
if not isinstance(parties, list):
parties = [parties]
if not isinstance(suffix, tuple):
suffix = (suffix,)
tag = FederationTagNamespace.generate_tag(*suffix)
for party in parties:
if party.role not in self._src:
raise RuntimeError(
f"not allowed to get object from {party} using {self._name}"
)
local = session.parties.local_party.role
if local not in self._dst:
raise RuntimeError(
f"not allowed to get object to {local} using {self._name}"
)
name = self._short_name if self._use_short_name else self._name
timer = profile.federation_get_timer(name, self._name, tag, local, parties)
rtn = session.federation.get(
name=name, tag=tag, parties=parties, gc=self._get_gc
)
timer.done(session.federation)
self._get_gc.gc()
return rtn
remote(self, obj, role=None, idx=-1, suffix=())
¶
send obj to other parties.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
object to be sent |
required | |
role |
role of parties to sent to, use one of ['Host', 'Guest', 'Arbiter', None]. The default is None, means sent values to parties regardless their party role |
None |
|
idx |
id of party to sent to. The default is -1, which means sent values to parties regardless their party id |
-1 |
|
suffix |
additional tag suffix, the default is tuple() |
() |
Source code in fate_arch/federation/transfer_variable.py
def remote(self, obj, role=None, idx=-1, suffix=tuple()):
"""
send obj to other parties.
Args:
obj: object to be sent
role: role of parties to sent to, use one of ['Host', 'Guest', 'Arbiter', None].
The default is None, means sent values to parties regardless their party role
idx: id of party to sent to.
The default is -1, which means sent values to parties regardless their party id
suffix: additional tag suffix, the default is tuple()
"""
from fate_arch.session import get_parties
party_info = get_parties()
if idx >= 0 and role is None:
raise ValueError("role cannot be None if idx specified")
# get subset of dst roles in runtime conf
if role is None:
parties = party_info.roles_to_parties(self._dst, strict=False)
else:
if isinstance(role, str):
role = [role]
parties = party_info.roles_to_parties(role)
if idx >= 0:
if idx >= len(parties):
raise RuntimeError(
f"try to remote to {idx}th party while only {len(parties)} configurated: {parties}, check {self._name}"
)
parties = parties[idx]
return self.remote_parties(obj=obj, parties=parties, suffix=suffix)
get(self, idx=-1, role=None, suffix=())
¶
get obj from other parties.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
idx |
id of party to get from. The default is -1, which means get values from parties regardless their party id |
-1 |
|
suffix |
additional tag suffix, the default is tuple() |
() |
Returns:
Type | Description |
---|---|
object or list of object |
Source code in fate_arch/federation/transfer_variable.py
def get(self, idx=-1, role=None, suffix=tuple()):
"""
get obj from other parties.
Args:
idx: id of party to get from.
The default is -1, which means get values from parties regardless their party id
suffix: additional tag suffix, the default is tuple()
Returns:
object or list of object
"""
from fate_arch.session import get_parties
if role is None:
src_parties = get_parties().roles_to_parties(roles=self._src, strict=False)
else:
if isinstance(role, str):
role = [role]
src_parties = get_parties().roles_to_parties(roles=role, strict=False)
if isinstance(idx, list):
rtn = self.get_parties(parties=[src_parties[i] for i in idx], suffix=suffix)
elif isinstance(idx, int):
if idx < 0:
rtn = self.get_parties(parties=src_parties, suffix=suffix)
else:
if idx >= len(src_parties):
raise RuntimeError(
f"try to get from {idx}th party while only {len(src_parties)} configurated: {src_parties}, check {self._name}"
)
rtn = self.get_parties(parties=src_parties[idx], suffix=suffix)[0]
else:
raise ValueError(
f"illegal idx type: {type(idx)}, supported types: int or list of int"
)
return rtn
BaseTransferVariables
¶
Source code in fate_arch/federation/transfer_variable.py
class BaseTransferVariables(object):
def __init__(self, *args):
pass
def __copy__(self):
return self
def __deepcopy__(self, memo):
return self
@staticmethod
def set_flowid(flowid):
"""
set global namespace for federations.
Parameters
----------
flowid: str
namespace
Returns
-------
None
"""
FederationTagNamespace.set_namespace(str(flowid))
def _create_variable(
self, name: str, src: typing.Iterable[str], dst: typing.Iterable[str]
) -> Variable:
full_name = f"{self.__module__}.{self.__class__.__name__}.{name}"
return Variable.get_or_create(
full_name, lambda: Variable(name=full_name, src=tuple(src), dst=tuple(dst))
)
@staticmethod
def all_parties():
"""
get all parties
Returns
-------
list
list of parties
"""
from fate_arch.session import get_parties
return get_parties().all_parties
@staticmethod
def local_party():
"""
indicate local party
Returns
-------
Party
party this program running on
"""
from fate_arch.session import get_parties
return get_parties().local_party
Methods¶
__init__(self, *args)
special
¶
Source code in fate_arch/federation/transfer_variable.py
def __init__(self, *args):
pass
__copy__(self)
special
¶
Source code in fate_arch/federation/transfer_variable.py
def __copy__(self):
return self
__deepcopy__(self, memo)
special
¶
Source code in fate_arch/federation/transfer_variable.py
def __deepcopy__(self, memo):
return self
set_flowid(flowid)
staticmethod
¶
set global namespace for federations.
Parameters¶
flowid: str namespace
Returns¶
None
Source code in fate_arch/federation/transfer_variable.py
@staticmethod
def set_flowid(flowid):
"""
set global namespace for federations.
Parameters
----------
flowid: str
namespace
Returns
-------
None
"""
FederationTagNamespace.set_namespace(str(flowid))
all_parties()
staticmethod
¶
get all parties
Returns¶
list list of parties
Source code in fate_arch/federation/transfer_variable.py
@staticmethod
def all_parties():
"""
get all parties
Returns
-------
list
list of parties
"""
from fate_arch.session import get_parties
return get_parties().all_parties
local_party()
staticmethod
¶
indicate local party
Returns¶
Party party this program running on
Source code in fate_arch/federation/transfer_variable.py
@staticmethod
def local_party():
"""
indicate local party
Returns
-------
Party
party this program running on
"""
from fate_arch.session import get_parties
return get_parties().local_party