跳到主要内容

通过 SDK 导入

本节将帮助你了解如何使用 SDK 的 BulkWriter 和 BulkImport API 向 Collection 中导入数据。

另外,您还可以参考我们的数据导入指南。其中包含了数据准备和数据导入两个部分的内容。

安装依赖

在终端中运行以下命令安装 pymilvusminio 或将它们升级到最新的版本。

python3 -m pip install --upgrade pymilvus minio

检查已准备数据

在您使用 BulkWriter 完成数据准备工作后,你会获得一个路径,指向准备好的数据文件。您可以使用如下代码来检查这些数据文件。

from minio import Minio

# Third-party constants
YOUR_ACCESS_KEY = "YOUR_ACCESS_KEY"
YOUR_SECRET_KEY = "YOUR_SECRET_KEY"
YOUR_BUCKET_NAME = "YOUR_BUCKET_NAME"
YOUR_REMOTE_PATH = "YOUR_REMOTE_PATH"

client = Minio(
endpoint="oss-cn-hangzhou.aliyuncs.com",
# 腾讯云请使用 "cos.ap-beijing-1.myqcloud.com"
access_key=YOUR_ACCESS_KEY,
secret_key=YOUR_SECRET_KEY,
secure=True,
region='cn-hangzhou'
# 腾讯云请使用 region='ap-xxx'
)

objects = client.list_objects(
bucket_name=YOUR_BUCKET_NAME,
prefix=YOUR_REMOTE_PATH,
recursive=True
)

print([obj.object_name for obj in objects])

# Output
#
# [
# "folder/1/claps.npy",
# "folder/1/id.npy",
# "folder/1/link.npy",
# "folder/1/publication.npy",
# "folder/1/reading_time.npy",
# "folder/1/responses.npy",
# "folder/1/title.npy",
# "folder/1/vector.npy"
# ]

导入数据

一旦您的数据和 Collection 准备就绪,您可以通过 Stage 或外部存储(如对象存储桶和块存储 Blob 容器)将数据导入到特定集合中。

从 Stage 中导入数据
内测版

如需从 Stage 中导入数据,需要先创建 Stage 并将数据上传至该 Stage 中。在完成这些步骤后,记录文件在 Stage 中的位置,以备调用数据导入接口时使用。更多内容,可以参考管理 Stage

您可以参考如下代码完成从 Stage 中导入数据的操作。

from pymilvus.bulk_writer import bulk_import

def cloud_bulkinsert():
# The value of the URL is fixed.
# For overseas regions, it is: https://api.cloud.zilliz.com
# For regions in China, it is: https://api.cloud.zilliz.com.cn
url = "https://api.cloud.zilliz.com"
api_key = ""
cluster_id = "inxx-xxxxxxxxxxxxxxx"
stage_name = "my-first-stage"
data_path = "dataPath"

print(f"\n===================== import files to cloud vectordb ====================")

resp = bulk_import(
url=url,
api_key=api_key,
cluster_id=cluster_id,
collection_name='quick_setup',
stage_name=stage_name,
data_paths=[[data_path]]
)
print(resp.json())

if __name__ == '__main__':
# # to call cloud bulkinsert api, you need to apply a cloud service from Zilliz Cloud(https://zilliz.com/cloud)
cloud_bulkinsert()

从外部存储中导入数据

在待导入数据和 Collection 都准备就绪后,可以使用如下脚本将数据从外部存储导入到指定 Collection。

from pymilvus.bulk_writer import bulk_import

# Bulk-import your data from the prepared data files
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com.cn"
CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx"
API_KEY = ""
STORAGE_URL = ""
ACCESS_KEY = ""
SECRET_KEY = ""

res = bulk_import(
url="api.cloud.zilliz.com.cn",
api_key=API_KEY,
object_url=OBJECT_URL,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
cluster_id=CLUSTER_ID, # Zilliz Cloud 集群 ID,如 "in01-xxxxxxxxxxxxxxx"
collection_name=COLLECTION_NAME
)

print(res.json())

# Output
#
# {
# "code": 200,
# "data": {
# "jobId": "9d0bc230-6b99-4739-a872-0b91cfe2515a"
# }
# }
📘说明

为了保证数据导入成功,请确认当前 Collection 下正在运行或待运行的任务数量不大于 10,000 条。

查看批量导入进度

可通过以下代码查看批量导入进度:

import json
from pymilvus.bulk_writer import get_import_progress

## Zilliz Cloud constants
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com.cn"
CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx"
API_KEY = ""

# Get bulk-insert job progress
resp = get_import_progress(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID,
job_id="job-01fa0e5d42cjxudhpuehyp",
)

print(json.dumps(resp.json(), indent=4))

列出所有批量导入任务

您还可以调用 ListImportJobs API 来了解其它批量导入任务的运行情况:

import json
from pymilvus.bulk_writer import list_import_jobs

## Zilliz Cloud constants
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com.cn"
CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx"
API_KEY = ""

# List bulk-insert jobs
resp = list_import_jobs(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID
)

print(json.dumps(resp.json(), indent=4))