Skip to content

Data Intelligence SDK Tutorial

This tutorial provides a step-by-step guide for using the Data Intelligence (DI) SDK, focusing on both administrative and non-administrative workflows. It covers client setup, pipeline and collection creation, bucket assignment, and similarity search.

Logging

di_sdk.log will be created with detailed logs in CWD. Set env variable LOG_LEVEL to adjust the logging. (For ex: export LOG_LEVEL=DEBUG to enable debug logging for more detailed analysis)

1. Admin Operations: Setting Up DIAdminClient

Administrative operations (CRUD for pipelines, collections, schemas, models) require the DIAdminClient. This client needs authentication credentials.

from pydi_client.di_client import DIAdminClient

# Initialize DIAdminClient with URI, username, and password
admin_client = DIAdminClient(
    uri="https://your-di-instance.com:<port>",
    username="admin_user",
    password="your_password"
)

Use DIAdminClient for: - Creating/deleting pipelines and collections - Assigning/unassigning buckets - Managing schemas and embedding models


2. Non-Admin Operations: Setting Up DIClient

For non-admin tasks (querying collections, retrieving pipelines, similarity search), use DIClient. Note that DIAdminClient extends DIClient, but for read-only/search operations, use DIClient.

from pydi_client.di_client import DIClient

# Initialize DIClient with the DI platform URI
client = DIClient(uri="https://your-di-instance.com:<port>")

Use DIClient for: - Querying collections and pipelines - Performing similarity searches


3. Getting List of Existing Schemas (Admin)

Before creating a pipeline, you may want to see which schemas are available in your DI instance. This helps you select the correct schema for your workflow.

# Get all schemas available in the DI platform
schemas_response = admin_client.get_all_schemas()
print(schemas_response)
# Output: V1ListSchemasResponse(
#     schemas=[SchemaRecordSummary(name="example_schema", ...), ...]
# )

You can inspect the schema names and details to choose the appropriate schema for your pipeline.

Note: Currently creating new schemas is not supported. Require to use existing schemas available by default


4. Getting List of Existing Embedding Models (Admin, RAG Workflow)

If you are setting up a RAG (Retrieval-Augmented Generation) pipeline, you need to select an embedding model supported by DI.

# Get all embedding models available in the DI platform
models_response = admin_client.get_all_embedding_models()
print(models_response)
# Output: V1ListModelsResponse(
#     models=[ModelRecordSummary(name="example_model", ...), ...]
# )

Review the available models and select the one that fits your use case.

Note: Currently creating new embedding models is not supported. Require to use existing models available by default


5. Creating a Pipeline (Admin)

A pipeline defines how data is processed and ingested. Use create_pipeline in DIAdminClient to set up a pipeline.

# Create a RAG pipeline
pipeline_response = admin_client.create_pipeline(
    name="example_rag_pipeline",
    pipeline_type="rag",  # or "metadata"
    model="example_model",  # Optional: specify embedding model
    custom_func="custom_processing_function",  # Optional
    event_filter_object_suffix=["*.txt", "*.pdf"],  # File types to ingest
    event_filter_max_object_size=10485760,  # Max file size in bytes
    schema="example_schema"  # Optional: specify schema
)

print(pipeline_response)
# Output: V1CreatePipelineResponse(
#     success=True,
#     message="Pipeline 'example_rag_pipeline' created successfully."
# )

NOTE:
- For pipeline_type = "rag" (RAG workflows), model & event_filter_max_object_size are required. schema is optional and custom_fuc is not supported. - For metadata pipelines, custom_func is required. schema & event_filter_max_object_size are optional and model is not supported.


6. Creating a Collection (Admin)

Collections are logical groupings of data that use a pipeline for ingestion and processing.

# Create a collection using the pipeline created above
collection_response = admin_client.create_collection(
    name="example_collection",
    pipeline="example_rag_pipeline",
    buckets=[]  # You can assign buckets now or later
)

print(collection_response)
# Output: V1CollectionResponse(
#     name="example_collection",
#     pipeline="example_rag_pipeline",
#     buckets=[]
# )

7. Assigning S3 Buckets to a Collection (Admin)

Assigning buckets triggers the pipeline and enables data ingestion. Buckets typically refer to S3 buckets from X10K.

# Assign S3 buckets to the collection
bucket_update_response = admin_client.assign_buckets_to_collection(
    collection_name="example_collection",
    buckets=["homefleet-bucket1", "homefleet-bucket2"]
)

print(bucket_update_response)
# Output: BucketUpdateResponse(
#     success=True,
#     message="Buckets assigned successfully to collection 'example_collection'."
# )

Note: You can also unassign buckets using unassign_buckets_from_collection.


8. Performing Similarity Search (User)

Once data is ingested, users can perform similarity searches using the DIClient. This operation requires S3 access and S3 secret keys for authorization of data from X10K buckets. Only the authozided data can be retrieved using similarity search.

# Perform a similarity search in a collection
results = client.similarity_search(
    query="machine learning",
    collection_name="example_collection",
    top_k=5,
    access_key="your_access_key",
    secret_key="your_secret_key",
)

print(results)
# Output: List of dictionaries with top-k similar results
# [
#     {
#         "dataChunk": "chunk1",
#         "score": 0.9,
#         "chunkMetadata": {
#             "objectKey": "value",
#             "startCharIndex": 1,
#             "endCharIndex": 2,
#             "bucketName": "string",
#             "pageLabel": "string",
#             "versionId": "string",
#         }
#     },
#     ...
# ]

Summary

  • Use DIAdminClient for all admin operations (CRUD on pipelines, collections, schemas, models).
  • Use DIClient for non-admin operations (search, read-only queries).
  • The typical workflow is: Create Pipeline → Create Collection → Assign Buckets → Ingest Data → Search Data.
  • Refer to the API reference for more advanced features and error handling.