跳到主要内容
版本:Cloud 开发指南

快速开始:使用 Serving 集群

Serving Cluster 同时集成了计算和存储能力,用于实时生产服务。在通过 Extract-Transform-Load(ETL)流水线清洗数据之后,您可以将数据导入 Serving Cluster。

开始之前

如下步骤假设您已经创建了 Zilliz Cloud Serving 集群,获取了可以访问该集群的 API Key 或鉴权凭据,并安装了相关 SDK。

步骤 1:建立连接

获取集群凭证或 API 密钥后,您可以通过以下示例代码连接到集群。

from pymilvus import MilvusClient, DataType

SERVING_CLUSTER_ENDPOINT = "https://{cluster-id}.{region}.vectordb.zilliz.com.cn:19530"
TOKEN = "YOUR_CLUSTER_TOKEN"
# A valid token could be either
# - An API key, or
# - A colon-joined cluster username and password, as in \`user:pass\`

# 1. Set up a Milvus client
client = MilvusClient(
uri=SERVING_CLUSTER_ENDPOINT,
token=TOKEN
)

步骤 2:(可选)创建 Database

Serving Cluster 默认会自带一个 Default Database。如果您使用 Default Database,可以跳过此步骤。您也可以按如下方式创建 Database:

# connect to the serving cluster
client = MilvusClient(
# a cluster-specific endpoint
uri=SERVING_CLUSTER_ENDPOINT,
token=TOKEN
)

client.create_database(
db_name="my_database"
)

步骤 3:创建 Collection

Database 准备就绪后,您可以在其中创建 Managed Collection。与将 Collection 列映射到外部数据文件的 External Collection 不同,Managed Collection 需要您导入数据。

以下示例演示了如何定义 Collection Schema 并创建 Collection。

from pymilvus import MilvusClient, DataType

schema = MilvusClient.create_schema()

schema.add_field(
field_name="product_id",
datatype=DataType.INT64,
is_primary=True
)

schema.add_field(
field_name="product_name",
datatype=DataType.VARCHAR,
max_length=512
)

schema.add_field(
field_name="embedding",
datatype=DataType.FLOAT_VECTOR,
dim=768
)

然后,您可以使用上述 Schema 创建一个 Collection。如果您决定使用 Default Database,可以直接省略 db_name 参数。

client.use_database(
db_name="my_database"
)

# create the collection
client.create_collection(
collection_name="prod_collection",
schema=schema
)

步骤 4:创建 Index

您需要为所有向量字段创建索引,并可按需为特定标量字段创建索引。

index_params = client.prepare_index_params()

# Add indexes
index_params.add_index(
field_name="embedding",
index_type="AUTOINDEX",
metric_type="COSINE"
)

index_params.add_index(
field_name="product_name",
index_type="AUTOINDEX"
)

client.create_index(
db_name="my_database",
collection_name="prod_collection",
index_params=index_params
)

步骤 5:加载 Collection

索引准备好后,将 Collection 加载到内存中。

client.load_collection(
db_name="my_database",
collection_name="prod_collection"
)

步骤 6:导入数据

完成上述设置后,您可以导入已处理的数据。以下示例假设您已将处理后的数据存储在外部存储桶中。

有关存储桶或存储集成中支持的数据格式,请参见支持的数据格式

from pymilvus.bulk_writer import bulk_import

# The path should be relative to the root
# of a zilliz cloud volume or an external storage
STORAGE_PATH = "oss://{bucket_name}/your/data/in/storage/"
ACCESS_KEY = "YOUR_STORAGE_ACCESS_KEY"
SECRET_KEY = "YOUR_STORAGE_SECRET_KEY"

res = bulk_import(
api_key="YOUR_ZILLIZ_API_KEY",
url="https://api.cloud.zilliz.com.cn",
project_id="proj-xxxxxxxxxxxxxxxxxxx",
region_id="ali-cn-hangzhou",
collection_name="prod_collection",
object_url="oss://{bucket_name}/you/data/in/storage.json",
access_key="YOUR_STORAGE_ACCESS_KEY",
secret_key="YOUR_STORAGE_SECRET_KEY"
)

# job-xxxxxxxxxxxxxxxxxxxxx

获得返回的 job ID 后,您可以监控导入进度。

import json
from pymilvus.bulk_writer import get_import_progress

# Get bulk-insert job progress
resp = get_import_progress(
api_key="YOUR_ZILLIZ_API_KEY",
url="https://api.cloud.zilliz.com.cn",
cluster_id="inxx-xxxxxxxxxxxxxxxxxxx",
job_id="job-xxxxxxxxxxxxxxxxxxxxx",
)

print(json.dumps(resp.json(), indent=4))

步骤 7:执行搜索

当您需要执行 Search、Query 或 Hybrid Search 时,可以通过 session 将请求附加到上一步创建的 On-demand 集群。

# highlight-start
session = client.session(
cluster_id="inxx-xxxxxxxxxxxxx"
)
# highlight-end

# 1536-dimensional vector
query_vector = [0.3580376395471989, -0.6023495712049978, 0.18414012509913835, -0.26286205330961354, ..., 0.9029438446296592]
res = session.search(
db_name="my_database",
collection_name="my_collection",
anns_field="vector",
data=[query_vector],
limit=3,
output_fields=["product_id", "title", "main_category", "price", "average_rating", "rating_number"],
search_params={"metric_type": "COSINE"}
)

随后,您可以进一步探索数据并找出最有价值的数据子集。然后,您可以连接到 Serving Cluster,将这些数据导入其中,并用于生产环境服务。