RemoteBulkWriter
A RemoteBulkWriter instance writes your raw data in a format that Milvus understands into an AWS-S3-compatible bucket.
RemoteBulkWriter(RemoteBulkWriterParam bulkWriterParam)
Methods of RemoteBulkWriter
:
Method | Description | Parameters |
---|---|---|
appendRow(JsonObject rowData) | Append a row into buffer. Once the buffer size exceeds a threshold, the writer will persist the buffer to data file. | rowData: A gson.JsonObject to store the data of a row. |
commit(boolean async) | Force persist data files and complete the writer. | async: Set to true to wait until all data files are persisted. |
getBatchFiles() | Returns a List<List<String>gt; of the persisted data files. Each List<String> is a batch files that can be input as a job for the bulkinsert interface. | N/A |
RemoteBulkWriterParam
Use the RemoteBulkWriterParam.Builder
to construct a RemoteBulkWriterParam
object.
import io.milvus.bulkwriter.RemoteBulkWriterParam;
RemoteBulkWriterParam.Builder builder = RemoteBulkWriterParam.newBuilder();
Methods of RemoteBulkWriterParam.Builder
:
Method | Description | Parameters |
---|---|---|
withCollectionSchema(CollectionSchemaParam collectionSchema) | Sets the collection schema. See the CollectionSchemaParam description in the Collection.createCollection() section. | collectionSchema: collection schema |
withConnectParam(StorageConnectParam connectParam) | Sets the connect parameters for different storage remote services. Currently, two options are available: S3ConnectParam and AzureConnectParam. | connectParam: Connect parameters for remote storage service. |
withRemotePath(String remotePath) | Sets the path on the remote storage service where to upload the data files. | remotePath: A path on the remote storage service. |
withChunkSize(int chunkSize) | Sets the maximum size of a data chunk. | chunkSize: the maximum size of a data chunk. |
withFileType(BulkFileType fileType) | The type of the output file. Currently, only PARQUET is available. | fileType: The output file type. |
build() | Constructs a LocalBulkWriterParam object | N/A |
AzureConnectParam
Use the AzureConnectParam.Builder
to construct an AzureConnectParam
object.
import io.milvus.bulkwriter.connect.AzureConnectParam;
AzureConnectParam.Builder builder = AzureConnectParam.newBuilder();
Methods of AzureConnectParam.Builder
:
Method | Description | Parameters |
---|---|---|
withContainerName(String containerName) | Sets the Azure container name. | containerName: The target container name. |
withConnStr(String connStr) | Sets the connect string. | connStr: A connection string to an Azure Storage account, which can be parsed to an account_url and a credential.To generate a connection string, read this link: https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string |
withAccountUrl(String accountUrl) | Sets the account url. | accountUrl: A string in format like https://<storage-account>.blob.core.windows.netRead this link for more info:https://learn.microsoft.com/en-us/azure/storage/common/storage-account-overview |
withCredential(TokenCredential credential) | Set the credential. | credential: Account access key for the account, read this link for more info:https://learn.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal#view-account-access-keys |
build() | Constructs a AzureConnectParam object | N/A |
S3ConnectParam
Use the S3ConnectParam.Builder
to construct an S3ConnectParam
object.
import io.milvus.bulkwriter.connect.S3ConnectParam;
S3ConnectParam.Builder builder = S3ConnectParam.newBuilder();
Methods of S3ConnectParam.Builder
:
Method | Description | Parameters |
---|---|---|
withCloudName(String cloudName) | Sets the cloud name of the S3. | cloudName: The cloud name. |
withBucketName(String bucketName) | Sets the bucket name. | bucketName: The bucket name. |
withEndpoint(String endpoint) | Sets the endpoint. | endpoint: The endpoint. |
withAccessKey(String accessKey) | Set the access key. | accessKey: The access key. |
withSecretKey(String secretKey) | Set the secret key. | secretKey: The secret key. |
withSessionToken(String sessionToken) | Set the session token. | sessionToken: The session token. |
withRegion(String region) | Set the region name. | region: The region name. |
withHttpClient(OkHttpClient httpClient) | Set the http client in necessary. | httpClient: http client. |
build() | Constructs a S3ConnectParam object | N/A |
Example
import io.milvus.bulkwriter.*;
import io.milvus.bulkwriter.connect.StorageConnectParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.param.collection.CollectionSchemaParam;
CollectionSchemaParam collectionSchema = CollectionSchemaParam.newBuilder()
.addFieldType(FieldType.newBuilder()
.withName("id")
.withDataType(DataType.Int64)
.withPrimaryKey(true)
.withAutoID(false)
.build())
.addFieldType(FieldType.newBuilder()
.withName("vector")
.withDataType(DataType.FloatVector)
.withDimension(DIM)
.build())
.build();
StorageConnectParam connectParam = S3ConnectParam.newBuilder()
.withEndpoint(STORAGE_ENDPOINT)
.withCloudName(CLOUD_NAME)
.withBucketName(STORAGE_BUCKET)
.withAccessKey(STORAGE_ACCESS_KEY)
.withSecretKey(STORAGE_SECRET_KEY)
.withRegion(STORAGE_REGION)
.build();
RemoteBulkWriterParam bulkWriterParam = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(collectionSchema)
.withRemotePath("bulk_data")
.withFileType(BulkFileType.PARQUET)
.withChunkSize(512 * 1024 * 1024)
.withConnectParam(connectParam)
.build();
try (RemoteBulkWriter remoteBulkWriter = RemoteBulkWriter(bulkWriterParam)) {
Gson gson = new Gson();
for (int i = 0; i < 10000; ++i) {
JsonObject row = new JsonObject();
row.addProperty("id", i);
row.add("vector", gson.toJsonTree(CommonUtils.generateFloatVector(DIM)));
remoteBulkWriter.appendRow(row);
}
System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());
System.out.println("Generate data files...");
remoteBulkWriter.commit(false);
List<List<String>> batchFiles = remoteBulkWriter.getBatchFiles();
System.out.printf("Data files have been uploaded: %s%n", batchFiles);
for (List<String> files : batchFiles) {
R<ImportResponse> response = milvusClient.bulkInsert(BulkInsertParam.newBuilder()
.withCollectionName(COLLECTION_NAME)
.withFiles(files)
.build());
}
} catch (Exception e) {
System.out.println("allTypesRemoteWriter catch exception: " + e);
throw e;
}