Pipeline tutorial upload
Pipeline Upload Data Tutorial¶
install¶
Pipeline
is distributed along with fate_client.
pip install fate_client
To use Pipeline, we need to first specify which FATE Flow Service
to connect to. Once fate_client
installed, one can find an cmd enterpoint name pipeline
:
!pipeline --help
Usage: pipeline [OPTIONS] COMMAND [ARGS]... Options: --help Show this message and exit. Commands: config pipeline config tool init - DESCRIPTION: Pipeline Config Command.
Assume we have a FATE Flow Service
in 127.0.0.1:9380(defaults in standalone), then exec
!pipeline init --ip 127.0.0.1 --port 9380
Pipeline configuration succeeded.
upload data¶
Before start a modeling task, the data to be used should be uploaded. Typically, a party is usually a cluster which include multiple nodes. Thus, when we upload these data, the data will be allocated to those nodes.
from pipeline.backend.pipeline import PipeLine
Make a pipeline
instance:
- initiator:
* role: guest
* party: 9999
- roles:
* guest: 9999
note that only local party id is needed.
pipeline_upload = PipeLine().set_initiator(role='guest', party_id=9999).set_roles(guest=9999)
Define partitions for data storage
partition = 4
Define table name and namespace, which will be used in FATE job configuration
dense_data_guest = {"name": "breast_hetero_guest", "namespace": f"experiment"}
dense_data_host = {"name": "breast_hetero_host", "namespace": f"experiment"}
tag_data = {"name": "breast_hetero_host", "namespace": f"experiment"}
Now, we add data to be uploaded
data_base = "/workspace/FATE/"
pipeline_upload.add_upload_data(file=os.path.join(data_base, "examples/data/breast_hetero_guest.csv"),
table_name=dense_data_guest["name"], # table name
namespace=dense_data_guest["namespace"], # namespace
head=1, partition=partition) # data info
pipeline_upload.add_upload_data(file=os.path.join(data_base, "examples/data/breast_hetero_host.csv"),
table_name=dense_data_host["name"],
namespace=dense_data_host["namespace"],
head=1, partition=partition)
pipeline_upload.add_upload_data(file=os.path.join(data_base, "examples/data/breast_hetero_host.csv"),
table_name=tag_data["name"],
namespace=tag_data["namespace"],
head=1, partition=partition)
We can then upload data
pipeline_upload.upload(drop=1)
UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%
2021-11-15 11:26:40.541 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:123 - Job id is 202111151126388669570 2021-11-15 11:26:42.601 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:144 - Job is still waiting, time elapse: 0:00:02 2021-11-15 11:26:49.260 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component upload_0, time elapse: 0:00:08 2021-11-15 11:26:52.644 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:131 - Job is success!!! Job id is 202111151126388669570 2021-11-15 11:26:52.649 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:132 - Total time: 0:00:12
UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%
2021-11-15 11:26:53.998 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:123 - Job id is 202111151126526555290 2021-11-15 11:26:54.006 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:144 - Job is still waiting, time elapse: 0:00:00 2021-11-15 11:27:00.984 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component upload_0, time elapse: 0:00:06 2021-11-15 11:27:02.770 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:131 - Job is success!!! Job id is 202111151126526555290 2021-11-15 11:27:02.771 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:132 - Total time: 0:00:08
UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%
2021-11-15 11:27:03.959 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:123 - Job id is 202111151127027776050 2021-11-15 11:27:03.969 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:144 - Job is still waiting, time elapse: 0:00:00 2021-11-15 11:27:11.018 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component upload_0, time elapse: 0:00:07 2021-11-15 11:27:12.669 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:131 - Job is success!!! Job id is 202111151127027776050 2021-11-15 11:27:12.671 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:132 - Total time: 0:00:08
For more demo on using pipeline to submit jobs, please refer to pipeline demos