LocalBulkWriter
A LocalBulkWriter instance rewrites your raw data locally in a format that Milvus understands.
LocalBulkWriter(LocalBulkWriterParam bulkWriterParam)
Methods of LocalBulkWriter
:
Method | Description | Parameters |
---|---|---|
appendRow(JsonObject rowData) | Append a row into buffer. Once the buffer size exceeds a threshold, the writer will persist the buffer to data file. | rowData: A gson.JsonObject to store the data of a row. |
commit(boolean async) | Force persist data files and complete the writer. | async: Set to true to wait until all data files are persisted. |
getBatchFiles() | Returns a List<List<String>gt; of the persisted data files. Each List<String> is a batch files that can be input as a job for the bulkinsert interface. | N/A |
LocalBulkWriterParam
Use the LocalBulkWriterParam.Builder
to construct a LocalBulkWriterParam
object.
import io.milvus.bulkwriter.LocalBulkWriterParam;
LocalBulkWriterParam.Builder builder = LocalBulkWriterParam.newBuilder();
Methods of LocalBulkWriterParam.Builder
:
Method | Description | Parameters |
---|---|---|
withCollectionSchema(CollectionSchemaParam collectionSchema) | Sets the collection schema. See the CollectionSchemaParam description in the Collection.createCollection() section. | collectionSchema: collection schema |
withLocalPath(tring localPath) | Sets the local path to output the data files. | localPath: A local path. |
withChunkSize(int chunkSize) | Sets the maximum size of a data chunk. | chunkSize: the maximum size of a data chunk. |
withFileType(BulkFileType fileType) | The type of the output file. Currently, only PARQUET is available. | fileType: The output file type. |
build() | Constructs a LocalBulkWriterParam object | N/A |
Example
import io.milvus.bulkwriter.*;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.param.collection.CollectionSchemaParam;
CollectionSchemaParam collectionSchema = CollectionSchemaParam.newBuilder()
.addFieldType(FieldType.newBuilder()
.withName("id")
.withDataType(DataType.Int64)
.withPrimaryKey(true)
.withAutoID(false)
.build())
.addFieldType(FieldType.newBuilder()
.withName("vector")
.withDataType(DataType.FloatVector)
.withDimension(DIM)
.build())
.build();
LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
.withCollectionSchema(collectionSchema)
.withLocalPath("/tmp/bulk_writer")
.withFileType(fileType)
.withChunkSize(512 * 1024 * 1024)
.build();
try (LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam)) {
Gson gson = new Gson();
for (int i = 0; i < 100000; i++) {
JsonObject row = new JsonObject();
row.addProperty("id", i);
row.add("vector", gson.toJsonTree(GeneratorUtils.genFloatVector(DIM)));
localBulkWriter.appendRow(row);
}
localBulkWriter.commit(false);
List<List<String>> batchFiles = localBulkWriter.getBatchFiles();
System.out.printf("Local writer done! output local files: %s%n", batchFiles);
} catch (Exception e) {
System.out.println("Local writer catch exception: " + e);
throw e;
}