Pipeline tutorial match id
Pipeline Match ID Tutorial¶
Starting at version 1.7, FATE distinguishes sample id(sid) and match id. Sid are unique to each sample entry, while match id corresponds to individual sample source identity. This adaption allows FATE to perform private set intersection on samples with repeated match id. User may choose to create sid by appending uuid to original sample entries at uploading; then module DataTransform
will extract true match id for later use. This tutorial walks through a full uploading-training process to demonstrate how to add and train with sid.
install¶
Pipeline
is distributed along with fate_client.
pip install fate_client
To use Pipeline, we need to first specify which FATE Flow Service
to connect to. Once fate_client
installed, one can find an cmd enterpoint name pipeline
:
!pipeline --help
Usage: pipeline [OPTIONS] COMMAND [ARGS]... Options: --help Show this message and exit. Commands: config pipeline config tool init - DESCRIPTION: Pipeline Config Command.
Assume we have a FATE Flow Service
in 127.0.0.1:9380(defaults in standalone), then exec
!pipeline init --ip 127.0.0.1 --port 9380
Pipeline configuration succeeded.
upload data¶
Before start a modeling task, the data to be used should be uploaded. Typically, a party is usually a cluster which include multiple nodes. Thus, when we upload these data, the data will be allocated to those nodes.
from pipeline.backend.pipeline import PipeLine
Make a pipeline
instance:
- initiator:
* role: guest
* party: 9999
- roles:
* guest: 9999
note that only local party id is needed.
pipeline_upload = PipeLine().set_initiator(role='guest', party_id=9999).set_roles(guest=9999)
Define partitions for data storage
partition = 4
Define table name and namespace, which will be used in FATE job configuration
dense_data_guest = {"name": "breast_hetero_guest", "namespace": f"experiment"}
dense_data_host = {"name": "breast_hetero_host", "namespace": f"experiment"}
tag_data = {"name": "breast_hetero_host", "namespace": f"experiment"}
Now, we add data to be uploaded. To create uuid as sid, turn on extend_sid
option. Alternatively, set auto_increasing_sid
to make extended sid starting at 0.
data_base = "/workspace/FATE/"
pipeline_upload.add_upload_data(file=os.path.join(data_base, "examples/data/breast_hetero_guest.csv"),
table_name=dense_data_guest["name"], # table name
namespace=dense_data_guest["namespace"], # namespace
head=1, partition=partition, # data info
extend_sid=True, # extend sid
auto_increasing_sid=False)
pipeline_upload.add_upload_data(file=os.path.join(data_base, "examples/data/breast_hetero_host.csv"),
table_name=dense_data_host["name"],
namespace=dense_data_host["namespace"],
head=1, partition=partition,
extend_sid=True,
auto_increasing_sid=False)
We can then upload data
pipeline_upload.upload(drop=1)
UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%
2021-12-31 03:27:14.912 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:123 - Job id is 202112310327142307260 2021-12-31 03:27:14.921 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:144 - Job is still waiting, time elapse: 0:00:00 m2021-12-31 03:27:15.452 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:27:19.088 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component upload_0, time elapse: 0:00:04 2021-12-31 03:27:20.675 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:131 - Job is success!!! Job id is 202112310327142307260 2021-12-31 03:27:20.676 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:132 - Total time: 0:00:05
UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%
2021-12-31 03:27:21.404 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:123 - Job id is 202112310327206816320 2021-12-31 03:27:23.987 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:144 - Job is still waiting, time elapse: 0:00:02 m2021-12-31 03:27:24.505 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:27:28.142 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component upload_0, time elapse: 0:00:06 2021-12-31 03:27:29.690 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:131 - Job is success!!! Job id is 202112310327206816320 2021-12-31 03:27:29.691 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:132 - Total time: 0:00:08
After uploading, we can then start modeling. Here we build a Hetero SecureBoost model the same way as in this demo, but note how specificaiton of DataTransform
module needs to be adjusted to crrectly load in match id.
from pipeline.backend.pipeline import PipeLine
from pipeline.component import Reader, DataTransform, Intersection, HeteroSecureBoost, Evaluation
from pipeline.interface import Data
pipeline = PipeLine() \
.set_initiator(role='guest', party_id=9999) \
.set_roles(guest=9999, host=10000)
reader_0 = Reader(name="reader_0")
# set guest parameter
reader_0.get_party_instance(role='guest', party_id=9999).component_param(
table={"name": "breast_hetero_guest", "namespace": "experiment"})
# set host parameter
reader_0.get_party_instance(role='host', party_id=10000).component_param(
table={"name": "breast_hetero_host", "namespace": "experiment"})
# set with match id
data_transform_0 = DataTransform(name="data_transform_0", with_match_id=True)
# set guest parameter
data_transform_0.get_party_instance(role='guest', party_id=9999).component_param(
with_label=True)
data_transform_0.get_party_instance(role='host', party_id=[10000]).component_param(
with_label=False)
intersect_0 = Intersection(name="intersect_0")
hetero_secureboost_0 = HeteroSecureBoost(name="hetero_secureboost_0",
num_trees=5,
bin_num=16,
task_type="classification",
objective_param={"objective": "cross_entropy"},
encrypt_param={"method": "paillier"},
tree_param={"max_depth": 3})
evaluation_0 = Evaluation(name="evaluation_0", eval_type="binary")
Add components to pipeline, in order of execution:
- data_transform_0 comsume reader_0's output data
- intersect_0 comsume data_transform_0's output data
- hetero_secureboost_0 consume intersect_0's output data
- evaluation_0 consume hetero_secureboost_0's prediciton result on training data
Then compile our pipeline to make it ready for submission.
pipeline.add_component(reader_0)
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
pipeline.add_component(intersect_0, data=Data(data=data_transform_0.output.data))
pipeline.add_component(hetero_secureboost_0, data=Data(train_data=intersect_0.output.data))
pipeline.add_component(evaluation_0, data=Data(data=hetero_secureboost_0.output.data))
pipeline.compile();
Now, submit(fit) our pipeline:
pipeline.fit()
2021-12-31 03:27:32.837 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:123 - Job id is 202112310327304051900 2021-12-31 03:27:34.379 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:144 - Job is still waiting, time elapse: 0:00:01 m2021-12-31 03:27:35.420 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:27:39.091 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component reader_0, time elapse: 0:00:06 m2021-12-31 03:27:41.739 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:27:46.111 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component data_transform_0, time elapse: 0:00:13 m2021-12-31 03:27:48.749 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:27:55.018 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component intersect_0, time elapse: 0:00:22 m2021-12-31 03:27:57.632 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:28:26.239 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component hetero_secureboost_0, time elapse: 0:00:53 m2021-12-31 03:28:28.840 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:28:33.008 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component evaluation_0, time elapse: 0:01:00 2021-12-31 03:28:35.597 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:131 - Job is success!!! Job id is 202112310327304051900 2021-12-31 03:28:35.599 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:132 - Total time: 0:01:02
Check data output on FATEBoard or download component output data to see now each data instance has a uuid as sid.
import json
print(json.dumps(pipeline.get_component("data_transform_0").get_output_data(limits=3), indent=4))
{ "data": [ "sid,inst_id,label,x0,x1,x2,x3,x4,x5,x6,x7,x8,x9", "8e46168869e911ec87ab8eff97a1caf50,133,1,0.254879,-1.046633,0.209656,0.074214,-0.441366,-0.377645,-0.485934,0.347072,-0.28757,-0.733474", "8e46168869e911ec87ab8eff97a1caf51,273,1,-1.142928,-0.781198,-1.166747,-0.923578,0.62823,-1.021418,-1.111867,-0.959523,-0.096672,-0.121683" ], "meta": [ "sid", "inst_id", "label", "x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9" ] }
For more demo on using pipeline to submit jobs, please refer to pipeline demos. Here we include other pipeline examples using data with match id.