Skip to content

Computing API

Most of the time, the federatedml's user does not need to know how to initialize a computing session because fate flow has already cover this for you. Unless, the user is writing unittest, and CTable related functions are involved. Initialize a computing session:

from fate_arch.session import computing_session
# initialize
computing_session.init(session_id="a great session")
# create a table from iterable data
table = computing_session.parallelize(range(100), include_key=False, partition=2)

computing session

computing_session

Source code in fate_arch/session/_session.py
class computing_session(object):
    @staticmethod
    def init(session_id, options=None):
        Session(options=options).as_global().init_computing(session_id)

    @staticmethod
    def parallelize(data: typing.Iterable, partition: int, include_key: bool, **kwargs) -> CTableABC:
        return get_computing_session().parallelize(data, partition=partition, include_key=include_key, **kwargs)

    @staticmethod
    def stop():
        return get_computing_session().stop()
init(session_id, options=None) staticmethod
Source code in fate_arch/session/_session.py
@staticmethod
def init(session_id, options=None):
    Session(options=options).as_global().init_computing(session_id)
parallelize(data, partition, include_key, **kwargs) staticmethod
Source code in fate_arch/session/_session.py
@staticmethod
def parallelize(data: typing.Iterable, partition: int, include_key: bool, **kwargs) -> CTableABC:
    return get_computing_session().parallelize(data, partition=partition, include_key=include_key, **kwargs)
stop() staticmethod
Source code in fate_arch/session/_session.py
@staticmethod
def stop():
    return get_computing_session().stop()

computing table

After creating a table using computing session, many distributed computing api available

CTableABC

a table of pair-like data supports distributed processing

Source code in fate_arch/abc/_computing.py
class CTableABC(metaclass=ABCMeta):
    """
    a table of pair-like data supports distributed processing
    """

    @property
    @abc.abstractmethod
    def engine(self):
        """
        get the engine name of table

        Returns
        -------
        int
           number of partitions
        """
        ...

    @property
    @abc.abstractmethod
    def partitions(self):
        """
        get the partitions of table

        Returns
        -------
        int
           number of partitions
        """
        ...

    @abc.abstractmethod
    def copy(self):
        ...

    @abc.abstractmethod
    def save(self, address: AddressABC, partitions: int, schema: dict, **kwargs):
        """
        save table

        Parameters
        ----------
        address: AddressABC
           address to save table to
        partitions: int
           number of partitions to save as
        schema: dict
           table schema
        """
        ...

    @abc.abstractmethod
    def collect(self, **kwargs) -> typing.Generator:
        """
        collect data from table

        Returns
        -------
        generator
           generator of data

        Notes
        ------
        no order guarantee
        """
        ...

    @abc.abstractmethod
    def take(self, n=1, **kwargs):
        """
        take ``n`` data from table

        Parameters
        ----------
        n: int
          number of data to take

        Returns
        -------
        list
           a list of ``n`` data

        Notes
        ------
        no order guarantee
        """
        ...

    @abc.abstractmethod
    def first(self, **kwargs):
        """
        take one data from table

        Returns
        -------
        object
          a data from table


        Notes
        -------
        no order guarantee
        """
        ...

    @abc.abstractmethod
    def count(self) -> int:
        """
        number of data in table

        Returns
        -------
        int
           number of data
        """
        ...

    @abc.abstractmethod
    def map(self, func) -> 'CTableABC':
        """
        apply `func` to each data

        Parameters
        ----------
        func: ``typing.Callable[[object, object], typing.Tuple[object, object]]``
           function map (k1, v1) to (k2, v2)

        Returns
        -------
        CTableABC
           A new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2)
        >>> b = a.map(lambda k, v: (k, v**2))
        >>> list(b.collect())
        [("k1", 1), ("k2", 4), ("k3", 9)]
        """
        ...

    @abc.abstractmethod
    def mapValues(self, func):
        """
        apply `func` to each value of data

        Parameters
        ----------
        func: ``typing.Callable[[object], object]``
           map v1 to v2

        Returns
        -------
        CTableABC
           A new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2)
        >>> b = a.mapValues(lambda x: len(x))
        >>> list(b.collect())
        [('a', 3), ('b', 1)]
        """
        ...

    @abc.abstractmethod
    def mapPartitions(self, func, use_previous_behavior=True, preserves_partitioning=False):
        """
        apply ``func`` to each partition of table

        Parameters
        ----------
        func: ``typing.Callable[[iter], list]``
           accept an iterator of pair, return a list of pair
        use_previous_behavior: bool
           this parameter is provided for compatible reason, if set True, call this func will call ``applyPartitions`` instead
        preserves_partitioning: bool
           flag indicate whether the `func` will preserve partition

        Returns
        -------
        CTableABC
           a new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2)
        >>> def f(iterator):
        ...     s = 0
        ... 	for k, v in iterator:
        ... 		s += v
        ...	    return [(s, s)]
        ...
        >>> b = a.mapPartitions(f)
        >>> list(b.collect())
        [(6, 6), (9, 9)]
        """
        ...

    @abc.abstractmethod
    def mapReducePartitions(self, mapper, reducer, **kwargs):
        """
        apply ``mapper`` to each partition of table and then perform reduce by key operation with `reducer`

        Parameters
        ----------
        mapper: ``typing.Callable[[iter], list]``
           accept an iterator of pair, return a list of pair
        reducer: ``typing.Callable[[object, object], object]``
           reduce v1, v2 to v3

        Returns
        -------
        CTableABC
           a new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2)
        >>> def _mapper(it):
        ...     r = []
        ...     for k, v in it:
        ...         r.append((k % 3, v**2))
        ...         r.append((k % 2, v ** 3))
        ...     return r
        >>> def _reducer(a, b):
        ...     return a + b
        >>> output = table.mapReducePartitions(_mapper, _reducer)
        >>> collected = dict(output.collect())
        >>> assert collected[0] == 3 ** 3 + 5 ** 3 + 4 ** 2
        >>> assert collected[1] == 2 ** 3 + 4 ** 3 + 2 ** 2 + 5 ** 2
        >>> assert collected[2] == 3 ** 2
        """

        ...

    def applyPartitions(self, func):
        """
        apply ``func`` to each partitions as a single object

        Parameters
        ----------
        func: ``typing.Callable[[iter], object]``
           accept a iterator, return a object

        Returns
        -------
        CTableABC
           a new table, with each partition contains a single key-value pair

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False)
        >>> def f(it):
        ...    r = []
        ...    for k, v in it:
        ...        r.append(v, v**2, v**3)
        ...    return r
        >>> output = a.applyPartitions(f)
        >>> assert (2, 2**2, 2**3) in [v[0] for _, v in output.collect()]
        """
        ...

    @abc.abstractmethod
    def mapPartitionsWithIndex(self, func, preserves_partitioning=False):
        ...

    @abc.abstractmethod
    def flatMap(self, func):
        """
        apply a flat ``func`` to each data of table

        Parameters
        ----------
        func: ``typing.Callable[[object, object], typing.List[object, object]]``
           a flat function accept two parameters return a list of pair

        Returns
        -------
        CTableABC
           a new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2)
        >>> b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)])
        >>> c = list(b.collect())
        >>> assert len(c) = 4
        >>> assert ((1, 1) in c) and ((2, 2) in c) and ((11, 1) in c) and ((12, 4) in c)
        """
        ...

    @abc.abstractmethod
    def reduce(self, func):
        """
        reduces all value in pair of table by a binary function `func`

        Parameters
        ----------
        func: typing.Callable[[object, object], object]
           binary function reduce two value into one

        Returns
        -------
        object
           a single object



        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize(range(100), include_key=False, partition=4)
        >>> assert a.reduce(lambda x, y: x + y) == sum(range(100))

        Notes
        ------
        `func` should be associative
        """
        ...

    @abc.abstractmethod
    def glom(self):
        """
        coalesces all data within partition into a list

        Returns
        -------
        list
           list containing all coalesced partition and its elements.
           First element of each tuple is chosen from key of last element of each partition.

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect()
        >>> list(a)
        [(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]
        """
        ...

    @abc.abstractmethod
    def sample(self, *, fraction: typing.Optional[float] = None, num: typing.Optional[int] = None, seed=None):
        """
        return a sampled subset of this Table.
        Parameters
        ----------
        fraction: float
          Expected size of the sample as a fraction of this table's size
          without replacement: probability that each element is chosen.
          Fraction must be [0, 1] with replacement: expected number of times each element is chosen.
        num: int
          Exact number of the sample from this table's size
        seed: int
          Seed of the random number generator. Use current timestamp when `None` is passed.

        Returns
        -------
        CTableABC
           a new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> x = computing_session.parallelize(range(100), include_key=False, partition=4)
        >>> 6 <= x.sample(fraction=0.1, seed=81).count() <= 14
        True

        Notes
        -------
        use one of ``fraction`` and ``num``, not both

        """
        ...

    @abc.abstractmethod
    def filter(self, func):
        """
        returns a new table containing only those keys which satisfy a predicate passed in via ``func``.

        Parameters
        ----------
        func: typing.Callable[[object, object], bool]
           Predicate function returning a boolean.

        Returns
        -------
        CTableABC
           A new table containing results.

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2)
        >>> b = a.filter(lambda k, v : k % 2 == 0)
        >>> list(b.collect())
        [(0, 0), (2, 2)]
        >>> c = a.filter(lambda k, v : v % 2 != 0)
        >>> list(c.collect())
        [(1, 1)]
        """
        ...

    @abc.abstractmethod
    def join(self, other, func):
        """
        returns intersection of this table and the other table.

        function ``func`` will be applied to values of keys that exist in both table.

        Parameters
        ----------
        other: CTableABC
          another table to be operated with.
        func: ``typing.Callable[[object, object], object]``
          the function applying to values whose key exists in both tables.
          default using left table's value.

        Returns
        -------
        CTableABC
          a new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)	# [(0, 1), (1, 2), (2, 3)]
        >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
        >>> c = a.join(b, lambda v1, v2 : v1 + v2)
        >>> list(c.collect())
        [(1, 3), (2, 5)]
        """
        ...

    @abc.abstractmethod
    def union(self, other, func=lambda v1, v2: v1):
        """
        returns union of this table and the other table.

        function ``func`` will be applied to values of keys that exist in both table.

        Parameters
        ----------
        other: CTableABC
          another table to be operated with.
        func: ``typing.Callable[[object, object], object]``
          The function applying to values whose key exists in both tables.
          default using left table's value.

        Returns
        -------
        CTableABC
          a new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)	# [(0, 1), (1, 2), (2, 3)]
        >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
        >>> c = a.union(b, lambda v1, v2 : v1 + v2)
        >>> list(c.collect())
        [(0, 1), (1, 3), (2, 5), (3, 3)]
        """
        ...

    @abc.abstractmethod
    def subtractByKey(self, other):
        """
        returns a new table containing elements only in this table but not in the other table.

        Parameters
        ----------
        other: CTableABC
          Another table to be subtractbykey with.

        Returns
        -------
        CTableABC
          A new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize(range(10), include_key=False, partition=2)
        >>> b = computing_session.parallelize(range(5), include_key=False, partition=2)
        >>> c = a.subtractByKey(b)
        >>> list(c.collect())
        [(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
        """
        ...

    @property
    def schema(self):
        if not hasattr(self, "_schema"):
            setattr(self, "_schema", {})
        return getattr(self, "_schema")

    @schema.setter
    def schema(self, value):
        setattr(self, "_schema", value)

Attributes

engine property readonly

get the engine name of table

Returns

int number of partitions

partitions property readonly

get the partitions of table

Returns

int number of partitions

schema property writable

Methods

copy(self)
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def copy(self):
    ...
save(self, address, partitions, schema, **kwargs)

save table

Parameters

address: AddressABC address to save table to partitions: int number of partitions to save as schema: dict table schema

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def save(self, address: AddressABC, partitions: int, schema: dict, **kwargs):
    """
    save table

    Parameters
    ----------
    address: AddressABC
       address to save table to
    partitions: int
       number of partitions to save as
    schema: dict
       table schema
    """
    ...
collect(self, **kwargs)

collect data from table

Returns

generator generator of data

Notes

no order guarantee

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def collect(self, **kwargs) -> typing.Generator:
    """
    collect data from table

    Returns
    -------
    generator
       generator of data

    Notes
    ------
    no order guarantee
    """
    ...
take(self, n=1, **kwargs)

take n data from table

Parameters

n: int number of data to take

Returns

list a list of n data

Notes

no order guarantee

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def take(self, n=1, **kwargs):
    """
    take ``n`` data from table

    Parameters
    ----------
    n: int
      number of data to take

    Returns
    -------
    list
       a list of ``n`` data

    Notes
    ------
    no order guarantee
    """
    ...
first(self, **kwargs)

take one data from table

Returns

object a data from table

Notes

no order guarantee

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def first(self, **kwargs):
    """
    take one data from table

    Returns
    -------
    object
      a data from table


    Notes
    -------
    no order guarantee
    """
    ...
count(self)

number of data in table

Returns

int number of data

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def count(self) -> int:
    """
    number of data in table

    Returns
    -------
    int
       number of data
    """
    ...
map(self, func)

apply func to each data

Parameters

func: typing.Callable[[object, object], typing.Tuple[object, object]] function map (k1, v1) to (k2, v2)

Returns

CTableABC A new table

Examples

from fate_arch.session import computing_session a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2) b = a.map(lambda k, v: (k, v**2)) list(b.collect()) [("k1", 1), ("k2", 4), ("k3", 9)]

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def map(self, func) -> 'CTableABC':
    """
    apply `func` to each data

    Parameters
    ----------
    func: ``typing.Callable[[object, object], typing.Tuple[object, object]]``
       function map (k1, v1) to (k2, v2)

    Returns
    -------
    CTableABC
       A new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2)
    >>> b = a.map(lambda k, v: (k, v**2))
    >>> list(b.collect())
    [("k1", 1), ("k2", 4), ("k3", 9)]
    """
    ...
mapValues(self, func)

apply func to each value of data

Parameters

func: typing.Callable[[object], object] map v1 to v2

Returns

CTableABC A new table

Examples

from fate_arch.session import computing_session a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2) b = a.mapValues(lambda x: len(x)) list(b.collect()) [('a', 3), ('b', 1)]

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def mapValues(self, func):
    """
    apply `func` to each value of data

    Parameters
    ----------
    func: ``typing.Callable[[object], object]``
       map v1 to v2

    Returns
    -------
    CTableABC
       A new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2)
    >>> b = a.mapValues(lambda x: len(x))
    >>> list(b.collect())
    [('a', 3), ('b', 1)]
    """
    ...
mapPartitions(self, func, use_previous_behavior=True, preserves_partitioning=False)

apply func to each partition of table

Parameters

func: typing.Callable[[iter], list] accept an iterator of pair, return a list of pair use_previous_behavior: bool this parameter is provided for compatible reason, if set True, call this func will call applyPartitions instead preserves_partitioning: bool flag indicate whether the func will preserve partition

Returns

CTableABC a new table

Examples

from fate_arch.session import computing_session a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2) def f(iterator): ... s = 0 ... for k, v in iterator: ... s += v ... return [(s, s)] ... b = a.mapPartitions(f) list(b.collect()) [(6, 6), (9, 9)]

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def mapPartitions(self, func, use_previous_behavior=True, preserves_partitioning=False):
    """
    apply ``func`` to each partition of table

    Parameters
    ----------
    func: ``typing.Callable[[iter], list]``
       accept an iterator of pair, return a list of pair
    use_previous_behavior: bool
       this parameter is provided for compatible reason, if set True, call this func will call ``applyPartitions`` instead
    preserves_partitioning: bool
       flag indicate whether the `func` will preserve partition

    Returns
    -------
    CTableABC
       a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2)
    >>> def f(iterator):
    ...     s = 0
    ... 	for k, v in iterator:
    ... 		s += v
    ...	    return [(s, s)]
    ...
    >>> b = a.mapPartitions(f)
    >>> list(b.collect())
    [(6, 6), (9, 9)]
    """
    ...
mapReducePartitions(self, mapper, reducer, **kwargs)

apply mapper to each partition of table and then perform reduce by key operation with reducer

Parameters

mapper: typing.Callable[[iter], list] accept an iterator of pair, return a list of pair reducer: typing.Callable[[object, object], object] reduce v1, v2 to v3

Returns

CTableABC a new table

Examples

from fate_arch.session import computing_session table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2) def _mapper(it): ... r = [] ... for k, v in it: ... r.append((k % 3, v**2)) ... r.append((k % 2, v ** 3)) ... return r def _reducer(a, b): ... return a + b output = table.mapReducePartitions(_mapper, _reducer) collected = dict(output.collect()) assert collected[0] == 3 ** 3 + 5 ** 3 + 4 ** 2 assert collected[1] == 2 ** 3 + 4 ** 3 + 2 ** 2 + 5 ** 2 assert collected[2] == 3 ** 2

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def mapReducePartitions(self, mapper, reducer, **kwargs):
    """
    apply ``mapper`` to each partition of table and then perform reduce by key operation with `reducer`

    Parameters
    ----------
    mapper: ``typing.Callable[[iter], list]``
       accept an iterator of pair, return a list of pair
    reducer: ``typing.Callable[[object, object], object]``
       reduce v1, v2 to v3

    Returns
    -------
    CTableABC
       a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2)
    >>> def _mapper(it):
    ...     r = []
    ...     for k, v in it:
    ...         r.append((k % 3, v**2))
    ...         r.append((k % 2, v ** 3))
    ...     return r
    >>> def _reducer(a, b):
    ...     return a + b
    >>> output = table.mapReducePartitions(_mapper, _reducer)
    >>> collected = dict(output.collect())
    >>> assert collected[0] == 3 ** 3 + 5 ** 3 + 4 ** 2
    >>> assert collected[1] == 2 ** 3 + 4 ** 3 + 2 ** 2 + 5 ** 2
    >>> assert collected[2] == 3 ** 2
    """

    ...
applyPartitions(self, func)

apply func to each partitions as a single object

Parameters

func: typing.Callable[[iter], object] accept a iterator, return a object

Returns

CTableABC a new table, with each partition contains a single key-value pair

Examples

from fate_arch.session import computing_session a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False) def f(it): ... r = [] ... for k, v in it: ... r.append(v, v**2, v**3) ... return r output = a.applyPartitions(f) assert (2, 2**2, 2**3) in [v[0] for _, v in output.collect()]

Source code in fate_arch/abc/_computing.py
def applyPartitions(self, func):
    """
    apply ``func`` to each partitions as a single object

    Parameters
    ----------
    func: ``typing.Callable[[iter], object]``
       accept a iterator, return a object

    Returns
    -------
    CTableABC
       a new table, with each partition contains a single key-value pair

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False)
    >>> def f(it):
    ...    r = []
    ...    for k, v in it:
    ...        r.append(v, v**2, v**3)
    ...    return r
    >>> output = a.applyPartitions(f)
    >>> assert (2, 2**2, 2**3) in [v[0] for _, v in output.collect()]
    """
    ...
mapPartitionsWithIndex(self, func, preserves_partitioning=False)
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def mapPartitionsWithIndex(self, func, preserves_partitioning=False):
    ...
flatMap(self, func)

apply a flat func to each data of table

Parameters

func: typing.Callable[[object, object], typing.List[object, object]] a flat function accept two parameters return a list of pair

Returns

CTableABC a new table

Examples

from fate_arch.session import computing_session a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2) b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)]) c = list(b.collect()) assert len© = 4 assert ((1, 1) in c) and ((2, 2) in c) and ((11, 1) in c) and ((12, 4) in c)

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def flatMap(self, func):
    """
    apply a flat ``func`` to each data of table

    Parameters
    ----------
    func: ``typing.Callable[[object, object], typing.List[object, object]]``
       a flat function accept two parameters return a list of pair

    Returns
    -------
    CTableABC
       a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2)
    >>> b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)])
    >>> c = list(b.collect())
    >>> assert len(c) = 4
    >>> assert ((1, 1) in c) and ((2, 2) in c) and ((11, 1) in c) and ((12, 4) in c)
    """
    ...
reduce(self, func)

reduces all value in pair of table by a binary function func

Parameters

func: typing.Callable[[object, object], object] binary function reduce two value into one

Returns

object a single object

Examples

from fate_arch.session import computing_session a = computing_session.parallelize(range(100), include_key=False, partition=4) assert a.reduce(lambda x, y: x + y) == sum(range(100))

Notes

func should be associative

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def reduce(self, func):
    """
    reduces all value in pair of table by a binary function `func`

    Parameters
    ----------
    func: typing.Callable[[object, object], object]
       binary function reduce two value into one

    Returns
    -------
    object
       a single object



    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize(range(100), include_key=False, partition=4)
    >>> assert a.reduce(lambda x, y: x + y) == sum(range(100))

    Notes
    ------
    `func` should be associative
    """
    ...
glom(self)

coalesces all data within partition into a list

Returns

list list containing all coalesced partition and its elements. First element of each tuple is chosen from key of last element of each partition.

Examples

from fate_arch.session import computing_session a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect() list(a) [(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def glom(self):
    """
    coalesces all data within partition into a list

    Returns
    -------
    list
       list containing all coalesced partition and its elements.
       First element of each tuple is chosen from key of last element of each partition.

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect()
    >>> list(a)
    [(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]
    """
    ...
sample(self, *, fraction=None, num=None, seed=None)

return a sampled subset of this Table. Parameters


fraction: float Expected size of the sample as a fraction of this table's size without replacement: probability that each element is chosen. Fraction must be [0, 1] with replacement: expected number of times each element is chosen. num: int Exact number of the sample from this table's size seed: int Seed of the random number generator. Use current timestamp when None is passed.

Returns

CTableABC a new table

Examples

from fate_arch.session import computing_session x = computing_session.parallelize(range(100), include_key=False, partition=4) 6 <= x.sample(fraction=0.1, seed=81).count() <= 14 True

Notes

use one of fraction and num, not both

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def sample(self, *, fraction: typing.Optional[float] = None, num: typing.Optional[int] = None, seed=None):
    """
    return a sampled subset of this Table.
    Parameters
    ----------
    fraction: float
      Expected size of the sample as a fraction of this table's size
      without replacement: probability that each element is chosen.
      Fraction must be [0, 1] with replacement: expected number of times each element is chosen.
    num: int
      Exact number of the sample from this table's size
    seed: int
      Seed of the random number generator. Use current timestamp when `None` is passed.

    Returns
    -------
    CTableABC
       a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> x = computing_session.parallelize(range(100), include_key=False, partition=4)
    >>> 6 <= x.sample(fraction=0.1, seed=81).count() <= 14
    True

    Notes
    -------
    use one of ``fraction`` and ``num``, not both

    """
    ...
filter(self, func)

returns a new table containing only those keys which satisfy a predicate passed in via func.

Parameters

func: typing.Callable[[object, object], bool] Predicate function returning a boolean.

Returns

CTableABC A new table containing results.

Examples

from fate_arch.session import computing_session a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2) b = a.filter(lambda k, v : k % 2 == 0) list(b.collect()) [(0, 0), (2, 2)] c = a.filter(lambda k, v : v % 2 != 0) list(c.collect()) [(1, 1)]

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def filter(self, func):
    """
    returns a new table containing only those keys which satisfy a predicate passed in via ``func``.

    Parameters
    ----------
    func: typing.Callable[[object, object], bool]
       Predicate function returning a boolean.

    Returns
    -------
    CTableABC
       A new table containing results.

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2)
    >>> b = a.filter(lambda k, v : k % 2 == 0)
    >>> list(b.collect())
    [(0, 0), (2, 2)]
    >>> c = a.filter(lambda k, v : v % 2 != 0)
    >>> list(c.collect())
    [(1, 1)]
    """
    ...
join(self, other, func)

returns intersection of this table and the other table.

function func will be applied to values of keys that exist in both table.

Parameters

other: CTableABC another table to be operated with. func: typing.Callable[[object, object], object] the function applying to values whose key exists in both tables. default using left table's value.

Returns

CTableABC a new table

Examples

from fate_arch.session import computing_session a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)] b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2) c = a.join(b, lambda v1, v2 : v1 + v2) list(c.collect()) [(1, 3), (2, 5)]

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def join(self, other, func):
    """
    returns intersection of this table and the other table.

    function ``func`` will be applied to values of keys that exist in both table.

    Parameters
    ----------
    other: CTableABC
      another table to be operated with.
    func: ``typing.Callable[[object, object], object]``
      the function applying to values whose key exists in both tables.
      default using left table's value.

    Returns
    -------
    CTableABC
      a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)	# [(0, 1), (1, 2), (2, 3)]
    >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
    >>> c = a.join(b, lambda v1, v2 : v1 + v2)
    >>> list(c.collect())
    [(1, 3), (2, 5)]
    """
    ...
union(self, other, func=<function CTableABC.<lambda> at 0x7f3a453e7e60>)

returns union of this table and the other table.

function func will be applied to values of keys that exist in both table.

Parameters

other: CTableABC another table to be operated with. func: typing.Callable[[object, object], object] The function applying to values whose key exists in both tables. default using left table's value.

Returns

CTableABC a new table

Examples

from fate_arch.session import computing_session a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)] b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2) c = a.union(b, lambda v1, v2 : v1 + v2) list(c.collect()) [(0, 1), (1, 3), (2, 5), (3, 3)]

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def union(self, other, func=lambda v1, v2: v1):
    """
    returns union of this table and the other table.

    function ``func`` will be applied to values of keys that exist in both table.

    Parameters
    ----------
    other: CTableABC
      another table to be operated with.
    func: ``typing.Callable[[object, object], object]``
      The function applying to values whose key exists in both tables.
      default using left table's value.

    Returns
    -------
    CTableABC
      a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)	# [(0, 1), (1, 2), (2, 3)]
    >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
    >>> c = a.union(b, lambda v1, v2 : v1 + v2)
    >>> list(c.collect())
    [(0, 1), (1, 3), (2, 5), (3, 3)]
    """
    ...
subtractByKey(self, other)

returns a new table containing elements only in this table but not in the other table.

Parameters

other: CTableABC Another table to be subtractbykey with.

Returns

CTableABC A new table

Examples

from fate_arch.session import computing_session a = computing_session.parallelize(range(10), include_key=False, partition=2) b = computing_session.parallelize(range(5), include_key=False, partition=2) c = a.subtractByKey(b) list(c.collect()) [(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def subtractByKey(self, other):
    """
    returns a new table containing elements only in this table but not in the other table.

    Parameters
    ----------
    other: CTableABC
      Another table to be subtractbykey with.

    Returns
    -------
    CTableABC
      A new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize(range(10), include_key=False, partition=2)
    >>> b = computing_session.parallelize(range(5), include_key=False, partition=2)
    >>> c = a.subtractByKey(b)
    >>> list(c.collect())
    [(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
    """
    ...

Last update: 2021-11-15