跳到主要内容

使用 BulkWriter

如果您的数据格式不满足支持的数据格式中列出的各项要求,您可以使用 PyMivus 和 Milvus Java SDK 自带的 BulkWriter 对您的数据进行转换。

概述

BulkWriter 用于将原始数据转换成可以批量导入到目标 Collection 的格式,并在 Zilliz Cloud 控制台、Milvus SDK 的 BulkInsert 接口,以及 RESTful API 的 Import 接口中使用。目前,有如下两种 BulkWriter

  • LocalBulkWriter 读取指定数据集并将其转换为适用的格式。

  • RemoteBulkWriter 完成 LocalBulkWriter 的所有工作,并将转换后的文件上传到指定的远程对象存储桶中。

使用步骤

安装 PyMilvus

在终端中运行如下命令,安装 PyMilvus 或将其升级到最新版本。

python3 -m pip install --upgrade pymilvus

创建 Collection Schema

确定需要导入数据的目标 Collection 的 Schema。在此步骤中,您需要确定哪些字段需要被包含在 Schema 中。

下述代码在创建 Schema 时使用了所有可能的数据类型。另外,Schema 中还关闭了 AutoID 并开启了动态字段支持。

from pymilvus import MilvusClient, DataType

# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)

DIM = 512

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True),
schema.add_field(field_name="bool", datatype=DataType.BOOL),
schema.add_field(field_name="int8", datatype=DataType.INT8),
schema.add_field(field_name="int16", datatype=DataType.INT16),
schema.add_field(field_name="int32", datatype=DataType.INT32),
schema.add_field(field_name="int64", datatype=DataType.INT64),
schema.add_field(field_name="float", datatype=DataType.FLOAT),
schema.add_field(field_name="double", datatype=DataType.DOUBLE),
schema.add_field(field_name="varchar", datatype=DataType.VARCHAR, max_length=512),
schema.add_field(field_name="json", datatype=DataType.JSON),
schema.add_field(field_name="array_str", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=128)
schema.add_field(field_name="array_int", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.INT64)
schema.add_field(field_name="float_vector", datatype=DataType.FLOAT_VECTOR, dim=DIM),
schema.add_field(field_name="binary_vector", datatype=DataType.BINARY_VECTOR, dim=DIM),
schema.add_field(field_name="float16_vector", datatype=DataType.FLOAT16_VECTOR, dim=DIM),
# schema.add_field(field_name="bfloat16_vector", datatype=DataType.BFLOAT16_VECTOR, dim=DIM),
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)

schema.verify()

创建 BulkWriter

PyMilvus 中有两种 BulkWriter。在本小节中,我们将了解如何创建这两种 BulkWriter。

  • LocalBulkWriter

    LocalBulkWriter 将原始数据按行添加到缓存中,然后将缓存中的数据存入一个指定格式的本地文件中。

    from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
    # 如果您的集群和 Milvus 2.4.2 及之后版本兼容
    # 需使用 `from pymilvus import LocalBulkWriter, BulkFileType`

    writer = LocalBulkWriter(
    schema=schema,
    local_path='.',
    segment_size=512 * 1024 * 1024, # default value
    file_type=BulkFileType.PARQUET
    )

    在创建 LocalBulkWriter 时,您应该:

    • schema 参数中引用之前创建好的 CollectionSchema 对象。

    • local_path 中指定本地输出路径。

    • file_type 中指定输出文件格式。

    • 如果原始数据中包含大量的数据记录,可以考虑设置 segment_size 来调整原始数据分段大小。

    关于参数设置,可以参考 SDK Reference 中关于 LocalBulkWriter 的介绍。

    📘说明

    使用 LocalBulkWriter 生成的 JSON 文件可以直接在 Zilliz Cloud 控制台上导入。

    如需导入其它格式的文件,需要先将它们上传到和集群同云的对象存储桶中。

  • RemoteBulkWriter

    LocalBulkWriter 不同的是,RemoteBulkWriter 将缓存中的数据写入一个远程对象存储桶中。因此,您需要先设置好用于连接该存储桶的 ConnectParam 对象,并在创建 RemoteBulkWriter 时引用该 ConnectParam 对象。

    from pymilvus.bulk_writer import RemoteBulkWriter
    # Use `from pymilvus import RemoteBulkWriter`
    # when you use pymilvus earlier than 2.4.2

    # Third-party constants
    ACCESS_KEY="bucket-ak"
    SECRET_KEY="bucket-sk"
    BUCKET_NAME="a-bucket"

    # Connections parameters to access the remote bucket
    conn = RemoteBulkWriter.S3ConnectParam(
    endpoint="s3.amazonaws.com", # use 'storage.googleapis.com' for Google Cloud Storage
    access_key=ACCESS_KEY,
    secret_key=SECRET_KEY,
    bucket_name=BUCKET_NAME,
    secure=True
    )

    from pymilvus.bulk_writer import BulkFileType
    # Use `from pymilvus import BulkFileType`
    # when you use pymilvus earlier than 2.4.2

    writer = RemoteBulkWriter(
    schema=schema,
    remote_path="/",
    connect_param=conn,
    file_type=BulkFileType.PARQUET
    )

    print('bulk writer created.')

    在连接参数准备就绪后,就可以在 RemoteBulkWriter 中引用了。

    from pymilvus.bulk_writer import RemoteBulkWriter
    # Use `from pymilvus import RemoteBulkWriter`
    # when you use pymilvus earlier than 2.4.2

    writer = RemoteBulkWriter(
    schema=schema,
    remote_path="/",
    connect_param=conn,
    file_type=BulkFileType.PARQUET
    )

除了 connect_param 参数外,RemoteBulkWriter 的参数与 LocalBulkWriter 基本相同。更多关于参数设置的信息,可以参考 SDK Reference 中关于 RemoteBulkWriter 的介绍。

开始转换

BulkWriter 对象提供两个方法:append_row() 将原始数据按行添加到缓存中,commit() 将缓存中的数据写入一个本地文件或远程对象存储桶中。

为了方便演示,下述代码向缓存中添加随机生成的数据。

import random, string, json
import numpy as np
import tensorflow as tf

def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits

return ''.join(random.choices(letters + digits, k=length))

# optional input for binary vector:
# 1. list of int such as [1, 0, 1, 1, 0, 0, 1, 0]
# 2. numpy array of uint8
def gen_binary_vector(to_numpy_arr):
raw_vector = [random.randint(0, 1) for i in range(DIM)]
if to_numpy_arr:
return np.packbits(raw_vector, axis=-1)
return raw_vector

# optional input for float vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float32
def gen_float_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype="float32")
return raw_vector

# # optional input for bfloat16 vector:
# # 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# # 2. numpy array of bfloat16
# def gen_bf16_vector(to_numpy_arr):
# raw_vector = [random.random() for _ in range(DIM)]
# if to_numpy_arr:
# return tf.cast(raw_vector, dtype=tf.bfloat16).numpy()
# return raw_vector

# optional input for float16 vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float16
def gen_fp16_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype=np.float16)
return raw_vector

# optional input for sparse vector:
# only accepts dict like {2: 13.23, 45: 0.54} or {"indices": [1, 2], "values": [0.1, 0.2]}
# note: no need to sort the keys
def gen_sparse_vector(pair_dict: bool):
raw_vector = {}
dim = random.randint(2, 20)
if pair_dict:
raw_vector["indices"] = [i for i in range(dim)]
raw_vector["values"] = [random.random() for _ in range(dim)]
else:
for i in range(dim):
raw_vector[i] = random.random()
return raw_vector

for i in range(10000):
writer.append_row({
"id": np.int64(i),
"bool": True if i % 3 == 0 else False,
"int8": np.int8(i%128),
"int16": np.int16(i%1000),
"int32": np.int32(i%100000),
"int64": np.int64(i),
"float": np.float32(i/3),
"double": np.float64(i/7),
"varchar": f"varchar_{i}",
"json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
"array_str": np.array([f"str_{k}" for k in range(5)], np.dtype("str")),
"array_int": np.array([k for k in range(10)], np.dtype("int64")),
"float_vector": gen_float_vector(True),
"binary_vector": gen_binary_vector(True),
"float16_vector": gen_fp16_vector(True),
# "bfloat16_vector": gen_bf16_vector(True),
"sparse_vector": gen_sparse_vector(True),
f"dynamic_{i}": i,
})
if (i+1)%1000 == 0:
writer.commit()
print('committed')

动态字段支持

在上一节中,我们创建 BulkWriter 时引用了一个启用了动态字段的 Schema。因此,我们可以在使用 BulkWriter 向缓存中添加数据时携带 Schema 中未定义的字段。

为了方便演示,下述代码向缓存中添加随机生成的数据。

import random
import string

def generate_random_string(length=5):
letters = string.ascii_uppercase
digits = string.digits

return ''.join(random.choices(letters + digits, k=length))

for i in range(10000):
writer.append_row({
"id": i,
"vector":[random.uniform(-1, 1) for _ in range(768)],
"dynamic_field_1": random.choice([True, False]),
"dynamic_field_2": random.randint(0, 100)
})

writer.commit()

验证结果

您可以通过打印 BulkWriterdata_path 属性来获取实际输出路径。

print(writer.data_path)

# LocalBulkWriter
# 'folder/45ae1139-1d87-4aff-85f5-0039111f9e6b'

# RemoteBulkWriter
# '/folder/5868ba87-743e-4d9e-8fa6-e07b39229425'

BulkWriter 生成一个 UUID,并使用该 UUID 在指定的输入路径下创建一个子路径,然后将生成的文件放在创建的子路径下。您也可以单击此处下载根据上述部署生成的示例数据文件。

生成的数据目录结构如下所示: