快速开始:External Data Lake Search公测版
按需搜索使您能够在无需持续运行计算资源的情况下,对存储在外部存储中的数据或已导入 Zilliz Cloud 的数据进行搜索。您可以基于 External Volume 或导入的文件创建 Collection,通过项目数据面 Endpoint 构建索引并 Refresh 元数据,并仅在需要执行 Search 或 Query 工作负载时启动 On-demand 集群。
您可以按以下流程操作:
开始之前
-
创建存储集成
存储集成是一种记录数据位置及访问凭证的配置对象。要设置存储集成,请按照步骤创建阿里云对象存储或 Amazon S3 集成,并获取其 ID。
-
创建 External Volume
External volume 是存储集成下的一个路径。请确保您的原始数据位于该路径中。您可以基于同一个存储集成创建多个 External Volume。详情请参见 External Volume。
步骤 1:连接项目 Endpoint
在操作 Database 之前,请先连接到项目 Endpoint。启用 Zilliz Cloud Console 中的按需计算后,您可以在快速开始页面获取项目 Endpoint。
External Collection 操作需要使用 API 密钥进行身份验证。此流程不支持 username:password 身份验证。
- Python
- cURL
# connect to database
client = MilvusClient(
# a project-specific on-demand compute endpoint
uri="https://{project-id}.{region}.api.cloud.zilliz.com.cn",
token="YOUR_API_KEY"
)
export PROJECT_ENDPOINT="https://{project-id}.{region}.api.cloud.zilliz.com.cn"
步骤 2:(可选)创建 Database
Zilliz Cloud 会默认提供一个 Default Database。如果您使用 Default Database,可以跳过此步骤。您也可以按如下方式创建 Database。
- Python
- cURL
client.create_database(
db_name="my_database"
)
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/databases/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
-d '{
"dbName": "my_database"
}'
步骤 3:创建 External Collection
Database 准备就绪后,您可以在其中创建 External Collection。External Collection 会将其列映射到您指定的数据文件,并为该 Collection 中的搜索附加按需计算资源。
与需要将原始数据导入 Collection 的 Managed Collection 不同,External Collection 通过亚秒级的 Refresh 操作从原始数据生成元数据。
以下示例演示了如何建立 Collection 字段与数据文件之间的映射关系。初始化 Schema 时,请传入数据的 Volume 路径和文件格式。
- Python
- cURL
from pymilvus import MilvusClient, DataType
schema = MilvusClient.create_schema(
external_source='volume://my_volume/iceberg/metadata/00001-xxx.metadata.json',
external_spec='{
"format": "iceberg-table",
"snapshot_id": "1234567890123456789"
}'
)
schema.add_field(
field_name="vector",
datatype=DataType.FLOAT_VECTOR,
dim=1536,
# highlight-next
external_field="embedding" # field name in the external data file
)
schema.add_field(
field_name="product_id",
datatype=DataType.VARCHAR,
max_length=32,
nullable=True,
# highlight-next
external_field="product_id"
)
schema.add_field(
field_name="title",
datatype=DataType.VARCHAR,
max_length=512,
nullable=True,
# highlight-next
external_field="title"
)
schema.add_field(
field_name="main_category",
datatype=DataType.VARCHAR,
max_length=64,
nullable=True,
# highlight-next
external_field="main_category"
)
schema.add_field(
field_name="price",
datatype=DataType.DOUBLE,
nullable=True,
# highlight-next
external_field="price"
)
schema.add_field(
field_name="average_rating",
datatype=DataType.DOUBLE,
nullable=True,
# highlight-next
external_field="average_rating"
)
schema.add_field(
field_name="rating_number",
datatype=DataType.INT64,
nullable=True,
# highlight-next
external_field="rating_number"
)
export schema='{
"externalSource": "volume://my_volume/iceberg/metadata/00001-xxx.metadata.json",
"externalSpec": "{\"format\": \"iceberg-table\", \"snapshot_id\": \"1234567890123456789\"}",
"fields": [
{
"fieldName": "vector",
"dataType": "FloatVector",
"elementTypeParams": {
"dim": "1536"
},
"externalField": "embedding"
},
{
"fieldName": "product_id",
"dataType": "VarChar",
"elementTypeParams": {
"max_length": "32"
},
"nullable": true,
"externalField": "product_id"
},
{
"fieldName": "title",
"dataType": "VarChar",
"elementTypeParams": {
"max_length": "512"
},
"nullable": true,
"externalField": "title"
},
{
"fieldName": "main_category",
"dataType": "VarChar",
"elementTypeParams": {
"max_length": "64"
},
"nullable": true,
"externalField": "main_category"
},
{
"fieldName": "price",
"dataType": "Double",
"nullable": true,
"externalField": "price"
},
{
"fieldName": "average_rating",
"dataType": "Double",
"nullable": true,
"externalField": "average_rating"
},
{
"fieldName": "rating_number",
"dataType": "Int64",
"nullable": true,
"externalField": "rating_number"
}
]
}'
然后,您可以使用上述 schema 创建一个 Collection。如果您决定使用 Default Database,可以直接省略 db_name 参数。
- Python
- cURL
client.use_database(
db_name="my_database"
)
# create the collection
client.create_collection(
collection_name="my_collection",
schema=schema
)
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"my_collection\",
\"schema\": $schema
}"
步骤 4:创建索引并 Refresh Collection
像在 Managed Collection 中一样,您也可以为 External Collection 创建索引。所有向量字段都应创建索引,您也可以选择为部分标量字段创建索引,以加速元数据过滤。不过,您需要调用 Refresh 来真正构建索引。
- Python
- cURL
index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
field_name="vector",
index_type="AUTOINDEX",
metric_type="COSINE"
)
index_params.add_index(
field_name="main_category",
index_type="AUTOINDEX"
)
client.create_index(
db_name="my_database",
collection_name="my_collection",
index_params=index_params
)
export indexParams='[
{
"fieldName": "vector",
"metricType": "COSINE",
"indexName": "vector",
"indexType": "AUTOINDEX"
},
{
"fieldName": "main_category",
"indexName": "main_category",
"indexType": "AUTOINDEX"
}
]'
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"my_collection\",
\"indexParams\": $indexParams
}"
然后 Refresh External Collection。您可以省略 externalSource 和 externalSpec 以复用 Collection Schema,也可以同时提供这两个参数,以基于新的数据源 Refresh Collection Schema。
- Python
- cURL
# refresh the external database
job_id = client.refresh_external_collection(
collection_name="my_collection"
)
# Refresh the external collection
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
-d '{
"dbName": "default",
"collectionName": "my_collection"
}'
# job-xxxxxxxxxxxxxxxxxxx
然后,您可以通过循环调用进度监控接口来跟踪 Refresh 操作的进度。
- Python
- cURL
progress = client.get_refresh_external_collection_progress(job_id=job_id)
curl -s --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/describe" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
-d '{
"jobId": "job-xxxxxxxxxxxxxxxxxxx"
}'
步骤 5:创建 On-demand 集群
External Collection 准备就绪后,您需要将其附加到 On-demand 集群,以执行按需搜索。以下命令将创建一个集群并返回其 ID。
export CONTROL_PLANE_ENDPOINT="https://api.cloud.zilliz.com.cn"
curl --request POST \
--url "${CONTROL_PLANE_ENDPOINT}/v2/clusters/createOnDemandCluster" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
-d '{
"projectId": "proj-xxxxxxxxxxxxxxxxxxx",
"regionId": "ali-cn-hangzhou",
"clusterName": "my-on-demand",
"cuSize": 8,
"autoSuspend": 60
}'
# inxx-xxxxxxxxxxxxx
步骤 6:执行搜索
当您需要执行 Search、Query 或 Hybrid Search 时,可以通过 session 将请求附加到上一步创建的 On-demand 集群。
- Python
- cURL
# highlight-start
session = client.session(
cluster_id="inxx-xxxxxxxxxxxxx"
)
# highlight-end
# 1536-dimensional vector
query_vector = [0.3580376395471989, -0.6023495712049978, 0.18414012509913835, -0.26286205330961354, ..., 0.9029438446296592]
res = session.search(
db_name="my_database",
collection_name="my_collection",
anns_field="vector",
data=[query_vector],
limit=3,
output_fields=["product_id", "title", "main_category", "price", "average_rating", "rating_number"],
search_params={"metric_type": "COSINE"}
)
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/entities/search?cluster_id=inxx-xxxxxxxxxxxxxxxxx" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
-d '{
"dbName": "my_database",
"collectionName": "my_collection",
"data": [
[
0.3580376395471989,
-0.6023495712049978,
0.18414012509913835,
-0.26286205330961354,
0.9029438446296592
]
],
"annsField": "vector",
"limit": 3,
"outputFields": [
"product_id",
"title",
"main_category",
"price",
"average_rating",
"rating_number"
]
}'
随后,您可以进一步探索数据并找出最有价值的数据子集。然后,您可以连接到 Serving Cluster,将这些数据导入其中,并用于生产环境服务。