253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637 | class DIAdminClient(DIClient):
"""
DIAdminClient
The `DIAdminClient` class is an extension of the `DIClient` class, designed specifically for administrative operations
within the Data Intelligence (DI) platform. This class provides methods to manage collections, pipelines, schemas,
and their associated resources. It is intended for use cases where administrative privileges are required to perform
operations such as creating, deleting, or modifying collections, pipelines, and schemas.
Purpose:
---------
The `DIAdminClient` is tailored for scenarios where administrative-level API interactions are necessary.
It provides a higher level of control over the DI platform's resources, enabling administrators to configure
and manage the system effectively.
When to Use:
------------
- Use this class when you need to perform administrative tasks such as:
- Creating or deleting collections.
- Assigning or unassigning buckets to/from collections.
- Creating or deleting pipelines.
- Creating or deleting schemas.
- This class is specifically designed for admin APIs. For non-admin APIs, use the `DIClient` class instead.
Initialization:
---------------
The `DIAdminClient` requires authentication credentials (username and password) to establish an authenticated session
with the DI platform. Upon initialization, it creates an authenticated session that is used for all subsequent API calls.
--------
"""
def __init__(self, *, uri: str, username: str, password: str) -> None:
super().__init__(uri=uri)
# create session with auth
self._authenticated_session = AuthAPI.login(
uri=uri, username=username, password=password
)
@property
def authenticated_session(self) -> AuthenticatedSession:
"""
Property to get the authenticated session object.
Returns:
AuthenticatedSession: The authenticated session object used for making API requests.This session is initialized with the provided URI, username, password, and token.
"""
return self._authenticated_session
def create_collection(
self, *, name: str, pipeline: str, buckets: Optional[List[str]] = None
) -> V1CollectionResponse:
"""
Creates a new collection using the specified pipeline.
If buckets are provided, they will be associated with the collection and inline embedding generation will be triggered.
If buckets are not provided, the collection will be created but embedding generation will be deferred until buckets are assigned to the collection.
Args:
name (str): The name of the collection to be created. This should be unique.
pipeline (str): The name of the pipeline to be associated with the collection.
buckets (Optional[List[str]], optional): A list of bucket names. Defaults to None.
Returns:
V1CollectionResponse: The created collection object.
Example usage:
```python
# Example usage of create_collection
client = DIAdminClient(uri="https://example.com", username="admin", password="password")
collection = client.create_collection(
name="example_collection",
pipeline="data_ingestion_pipeline",
buckets=["bucket1", "bucket2"]
)
print(collection)
# Sample Output:
# V1CollectionResponse(
# name="example_collection",
# pipeline="data_ingestion_pipeline",
# buckets=["bucket1", "bucket2"]
# )
```
"""
if buckets is None:
buckets = []
return CollectionAPI(session=self.authenticated_session).create_collection(
name=name,
buckets=buckets,
pipeline=pipeline,
)
def delete_collection(self, *, name: str) -> V1DeleteCollectionResponse:
"""
Deletes a collection by its name.
Args:
name (str): The name of the collection to be deleted.
Returns:
None: This method does not return any value.
Example Usage:
```python
client = DIAdminClient(uri="https://example.com", username="admin", password="password")
resp = client.delete_collection(name="example_collection")
print(resp)
# Output:
# V1DeleteCollectionResponse(
# success=True,
# message="Collection 'example_collection' has been deleted."
# )
```
"""
return CollectionAPI(session=self.authenticated_session).delete_collection(
name=name
)
def assign_buckets_to_collection(
self, *, collection_name: str, buckets: List[str]
) -> BucketUpdateResponse:
"""
Assigns a list of buckets to a specified collection.
This enables inline embedding generation for the specified buckets.
Args:
collection_name (str): The name of the collection to which the buckets
will be assigned.
buckets (List[str]): A list of bucket names to be assigned to the
specified collection.
Returns:
BucketUpdateResponse: The response object containing details about the
updated collection and assigned buckets.
Example Usage:
```python
# Initialize the DIAdminClient
client = DIAdminClient(uri="http://example.com", username="admin", password="password")
# Define the collection name and buckets
collection_name = "my_collection"
buckets = ["bucket1", "bucket2", "bucket3"]
# Assign buckets to the collection
response = client.assign_buckets_to_collection(
collection_name=collection_name,
buckets=buckets
)
print(response)
# Output:
# BucketUpdateResponse(
# sucess=true,
# message="Buckets assigned successfully to collection 'my_collection'."
# )
```
Notes:
- This method is typically used for enabling the user buckets for intelligence using an existing collection.
"""
return CollectionAPI(
session=self.authenticated_session
).assign_buckets_to_collection(collection_name=collection_name, buckets=buckets)
def unassign_buckets_from_collection(
self, *, collection_name: str, buckets: List[str]
) -> BucketUpdateResponse:
"""
Unassigns one or more buckets from a specified collection.
Args:
collection_name (str): The name of the collection from which the buckets will be unassigned.
buckets (List[str]): A list of bucket names to be unassigned.
Returns:
BucketUpdateResponse: The response object containing details about the updated collection
after the buckets have been unassigned.
Example usage:
```python
# Example usage of unassign_buckets_from_collection
client = DIAdminClient(uri="http://example.com", username="admin", password="password")
# Unassign multiple buckets
response = client.unassign_buckets_from_collection(
collection_name="example_collection",
buckets=["bucket_1", "bucket_2", "bucket_3"]
)
print(response)
# Output:
# BucketUpdateResponse(
# success=true,
# message="Buckets unassigned successfully from collection 'example_collection'."
# )
```
"""
return CollectionAPI(
session=self.authenticated_session
).unassign_buckets_from_collection(
collection_name=collection_name, buckets=buckets
)
def create_pipeline(
self,
*,
name: str,
pipeline_type: str,
event_filter_object_suffix: List[str],
event_filter_max_object_size: Optional[int] = None,
schema: Optional[str] = None,
model: Optional[str] = None,
custom_func: Optional[str] = None,
) -> V1CreatePipelineResponse:
"""
Creates a new pipeline with the specified configuration.
Args:
name (str): The name of the pipeline to be created.
pipeline_type (str): The type of the pipeline (e.g., "rag", "metadata").
model Optional (str): The model associated with the pipeline.
custom_func Optional (str): The custom function to be used in the pipeline.
event_filter_object_suffix (List[str]): A list of file suffixes to filter events. Ex - ["*.txt", "*.pdf"]
event_filter_max_object_size (int): The maximum object size for event filtering. Ex - 10485760
schema Optional (str): The schema definition for the pipeline.
Returns:
V1CreatePipelineResponse: The response object containing details of the created pipeline.
Example usage:
```python
client = DIAdminClient(
uri="http://example.com",
username="admin",
password="password"
)
pipeline_data = client.create_pipeline(
name="example_pipeline",
pipeline_type="rag",
model="example_model",
custom_func="custom_processing_function",
event_filter_object_suffix=["*.txt", "*.pdf"],
event_filter_max_object_size=10485760,
schema="example_schema"
)
print(pipeline_data)
# Output: V1CreatePipelineResponse(
# success=true,
# message="Pipeline 'example_pipeline' created successfully."
# )
```
"""
return PipelineAPI(session=self.authenticated_session).create_pipeline(
name=name,
pipeline_type=pipeline_type,
model=model,
custom_func=custom_func,
event_filter_object_suffix=event_filter_object_suffix,
event_filter_max_object_size=event_filter_max_object_size,
schema=schema,
)
def delete_pipeline(self, *, name: str) -> V1DeletePipelineResponse:
"""
Deletes a pipeline with the specified name.
Args:
name (str): The name of the pipeline to be deleted.
Returns:
V1DeletePipelineResponse: The response object containing details about the deleted pipeline.
Example usage:
```python
# Initialize the DIAdminClient
client = DIAdminClient(uri="http://example.com", username="admin", password="password")
# Delete a pipeline by name
response = client.delete_pipeline(name="example_pipeline")
print(response)
# Output:
# V1DeletePipelineResponse(
# message="Pipeline successfully deleted"
# success=True,
# )
```
"""
return PipelineAPI(session=self.authenticated_session).delete_pipeline(
name=name
)
def get_schema(self, *, name: str) -> V1SchemasResponse:
"""
Retrieve a schema by its name.
This method fetches a schema object from the SchemaAPI using the provided name.
Args:
name (str): The name of the schema to retrieve. This is a required keyword-only argument.
Returns:
V1SchemaResponse: The response object containing details about the schema.
Example usage:
```python
client = DIAdminClient(uri="https://example.com", username="admin", password="password")
schema = client.get_schema(name="example_schema")
print(schema)
# Output: V1SchemaResponse(
# name="example_schema",
# type="...",
# schema=[SchemaItem]
# )
"""
return SchemaAPI(session=self.authenticated_session).get_schema(name=name)
def get_all_schemas(self) -> V1ListSchemasResponse:
"""
Retrieves all schemas available in the system.
Returns:
V1ListSchemasResponse: A response object containing a list of
schemas available in the system.
Example usage:
```python
client = DIAdminClient(uri="https://example.com", username="admin", password="password")
schemas = client.get_all_schemas()
print(schemas)
# Output: V1ListSchemasResponse(
# schemas=[SchemaRecordSummary]
# )
```
"""
return SchemaAPI(session=self.authenticated_session).get_schemas()
def get_embedding_model(self, *, name: str) -> V1ModelsResponse:
"""
Retrieve an embedding model by its name.
This method fetches an embedding model object from the EmbeddingModelAPI using the provided name.
Args:
name (str): The name of the embedding model to retrieve. This is a required keyword-only argument.
Returns:
V1ModelsResponse: The response object containing details about the embedding model.
Example usage:
```python
client = DIAdminClient(uri="https://example.com", username="admin", password="password")
model = client.get_embedding_model(name="example_model")
print(model)
# Output: V1ModelsResponse(
# name="example_model",
# modelName="...",
# capabilities="...",
# dimension=..., # e.g., 768
# maximumTokens=..., # e.g., 512
# version="..."
# )
```
"""
return EmbeddingModelAPI(self.authenticated_session).get_model(name=name)
def get_all_embedding_models(self) -> V1ListModelsResponse:
"""
Retrieves all embedding models available in the system.
Returns:
V1ListModelsResponse: A response object containing a list of
embedding models available in the system.
Example usage:
```python
client = DIAdminClient(uri="https://example.com", username="admin", password="password")
models = client.get_all_embedding_models()
print(models[0])
# Output: V1ModelsResponse(
# models=[ModelRecordSummary],
# )
```
"""
return EmbeddingModelAPI(self.authenticated_session).get_models()
|