Skip to content

Preprocessor

delete_preprocessed_collection(collection_name, client_manager=None)

Delete a preprocessed collection. This function allows you to delete the preprocessing done for a particular collection. It does so by deleting the object in the ELYSIA_METADATA__ collection with the name of the collection.

Parameters:

Name Type Description Default
collection_name str

The name of the collection to delete.

required
client_manager ClientManager

The client manager to use.

None
Source code in elysia/preprocess/collection.py
def delete_preprocessed_collection(
    collection_name: str, client_manager: ClientManager | None = None
) -> None:
    """
    Delete a preprocessed collection.
    This function allows you to delete the preprocessing done for a particular collection.
    It does so by deleting the object in the ELYSIA_METADATA__ collection with the name of the collection.

    Args:
        collection_name (str): The name of the collection to delete.
        client_manager (ClientManager): The client manager to use.
    """
    return asyncio_run(
        delete_preprocessed_collection_async(collection_name, client_manager)
    )

delete_preprocessed_collection_async(collection_name, client_manager=None) async

Delete the preprocessed collection from the Weaviate cluster. This function simply deletes the cached preprocessed metadata from the Weaviate cluster. It does so by deleting the object in the collection ELYSIA_METADATA__ with the name of the collection.

Parameters:

Name Type Description Default
collection_name str

The name of the collection to delete the preprocessed metadata for.

required
client_manager ClientManager

The client manager to use. If not provided, a new ClientManager will be created using the environment variables/configured settings.

None
Source code in elysia/preprocess/collection.py
async def delete_preprocessed_collection_async(
    collection_name: str, client_manager: ClientManager | None = None
) -> None:
    """
    Delete the preprocessed collection from the Weaviate cluster.
    This function simply deletes the cached preprocessed metadata from the Weaviate cluster.
    It does so by deleting the object in the collection ELYSIA_METADATA__ with the name of the collection.

    Args:
        collection_name (str): The name of the collection to delete the preprocessed metadata for.
        client_manager (ClientManager): The client manager to use.
            If not provided, a new ClientManager will be created using the environment variables/configured settings.
    """
    if client_manager is None:
        client_manager = ClientManager()
        close_clients_after_completion = True
    else:
        close_clients_after_completion = False

    async with client_manager.connect_to_async_client() as client:
        if await client.collections.exists(f"ELYSIA_METADATA__"):
            metadata_collection = client.collections.get("ELYSIA_METADATA__")
            metadata = await metadata_collection.query.fetch_objects(
                filters=Filter.by_property("name").equal(collection_name),
                limit=1,
            )
            if metadata is not None and len(metadata.objects) > 0:
                await metadata_collection.data.delete_by_id(metadata.objects[0].uuid)
            else:
                raise Exception(f"Metadata for {collection_name} does not exist")

    if close_clients_after_completion:
        await client_manager.close_clients()

edit_preprocessed_collection(collection_name, client_manager=None, named_vectors=None, summary=None, mappings=None, fields=None)

Edit a preprocessed collection. This function allows you to edit the named vectors, summary, mappings, and fields of a preprocessed collection. It does so by updating the ELYSIA_METADATA__ collection. Find available mappings in the elysia.util.return_types module.

Parameters:

Name Type Description Default
collection_name str

The name of the collection to edit.

required
client_manager ClientManager

The client manager to use. If not provided, a new ClientManager will be created using the environment variables/configured settings.

None
named_vectors list[dict]

The named vectors to update. This has fields "name", "enabled", and "description". The "name" is used to identify the named vector to change (the name will not change). Set "enabled" to True/False to enable/disable the named vector. Set "description" to describe the named vector. The description of named vectors is not automatically generated by the LLM. Any named vectors that are not provided will not be updated. If None or not provided, the named vectors will not be updated.

None
summary str

The summary to update. The summary is a short description of the collection, generated by the LLM. This will replace the existing summary of the collection. If None or not provided, the summary will not be updated.

None
mappings dict

The mappings to update. The mappings are what the frontend will use to display the collection, and the associated fields. I.e., which fields correspond to which output fields on the frontend. The keys of the outer level of the dictionary are the mapping names, the values are dictionaries with the mappings. The inner dictionary has the keys as the collection fields, and the values as the frontend fields. If None or not provided, the mappings will not be updated.

None
fields list[dict]

The fields to update. Each element in the list is a dictionary with the following fields: - "name": The name of the field. (This is used to identify the field to change, the name will not change). - "description": The description of the field to update. Any fields that are not provided will not be updated. If None or not provided, the fields will not be updated.

None

Returns:

Name Type Description
dict None

The updated preprocessed collection.

Source code in elysia/preprocess/collection.py
def edit_preprocessed_collection(
    collection_name: str,
    client_manager: ClientManager | None = None,
    named_vectors: list[dict] | None = None,
    summary: str | None = None,
    mappings: dict[str, dict[str, str]] | None = None,
    fields: list[dict[str, str] | None] | None = None,
) -> None:
    """
    Edit a preprocessed collection.
    This function allows you to edit the named vectors, summary, mappings, and fields of a preprocessed collection.
    It does so by updating the ELYSIA_METADATA__ collection.
    Find available mappings in the `elysia.util.return_types` module.

    Args:
        collection_name (str): The name of the collection to edit.
        client_manager (ClientManager): The client manager to use.
            If not provided, a new ClientManager will be created using the environment variables/configured settings.
        named_vectors (list[dict]): The named vectors to update. This has fields "name", "enabled", and "description".
            The "name" is used to identify the named vector to change (the name will not change).
            Set "enabled" to True/False to enable/disable the named vector.
            Set "description" to describe the named vector.
            The description of named vectors is not automatically generated by the LLM.
            Any named vectors that are not provided will not be updated.
            If None or not provided, the named vectors will not be updated.
        summary (str): The summary to update.
            The summary is a short description of the collection, generated by the LLM.
            This will replace the existing summary of the collection.
            If None or not provided, the summary will not be updated.
        mappings (dict): The mappings to update.
            The mappings are what the frontend will use to display the collection, and the associated fields.
            I.e., which fields correspond to which output fields on the frontend.
            The keys of the outer level of the dictionary are the mapping names, the values are dictionaries with the mappings.
            The inner dictionary has the keys as the collection fields, and the values as the frontend fields.
            If None or not provided, the mappings will not be updated.
        fields (list[dict]): The fields to update.
            Each element in the list is a dictionary with the following fields:
            - "name": The name of the field. (This is used to identify the field to change, the name will not change).
            - "description": The description of the field to update.
            Any fields that are not provided will not be updated.
            If None or not provided, the fields will not be updated.

    Returns:
        dict: The updated preprocessed collection.
    """

    return asyncio_run(
        edit_preprocessed_collection_async(
            collection_name, client_manager, named_vectors, summary, mappings, fields
        )
    )

edit_preprocessed_collection_async(collection_name, client_manager=None, named_vectors=None, summary=None, mappings=None, fields=None) async

Async version of edit_preprocessed_collection.

Parameters:

Name Type Description Default
collection_name str

The name of the collection to edit.

required
client_manager ClientManager

The client manager to use. If not provided, a new ClientManager will be created using the environment variables/configured settings.

None
named_vectors list[dict]

The named vectors to update. This has fields "name", "enabled", and "description". The "name" is used to identify the named vector to change (the name will not change). Set "enabled" to True/False to enable/disable the named vector. Set "description" to describe the named vector. The description of named vectors is not automatically generated by the LLM. Any named vectors that are not provided will not be updated. If None or not provided, the named vectors will not be updated.

None
summary str

The summary to update. The summary is a short description of the collection, generated by the LLM. This will replace the existing summary of the collection. If None or not provided, the summary will not be updated.

None
mappings dict

The mappings to update. The mappings are what the frontend will use to display the collection, and the associated fields. I.e., which fields correspond to which output fields on the frontend. The keys of the outer level of the dictionary are the mapping names, the values are dictionaries with the mappings. The inner dictionary has the keys as the collection fields, and the values as the frontend fields. If None or not provided, the mappings will not be updated.

None
fields list[dict]

The fields to update. Each element in the list is a dictionary with the following fields: - "name": The name of the field. (This is used to identify the field to change, the name will not change). - "description": The description of the field to update. Any fields that are not provided will not be updated. If None or not provided, the fields will not be updated.

None

Returns:

Name Type Description
dict dict

The updated preprocessed collection.

Source code in elysia/preprocess/collection.py
async def edit_preprocessed_collection_async(
    collection_name: str,
    client_manager: ClientManager | None = None,
    named_vectors: list[dict] | None = None,
    summary: str | None = None,
    mappings: dict[str, dict[str, str]] | None = None,
    fields: list[dict[str, str] | None] | None = None,
) -> dict:
    """
    Async version of `edit_preprocessed_collection`.

    Args:
        collection_name (str): The name of the collection to edit.
        client_manager (ClientManager): The client manager to use.
            If not provided, a new ClientManager will be created using the environment variables/configured settings.
        named_vectors (list[dict]): The named vectors to update. This has fields "name", "enabled", and "description".
            The "name" is used to identify the named vector to change (the name will not change).
            Set "enabled" to True/False to enable/disable the named vector.
            Set "description" to describe the named vector.
            The description of named vectors is not automatically generated by the LLM.
            Any named vectors that are not provided will not be updated.
            If None or not provided, the named vectors will not be updated.
        summary (str): The summary to update.
            The summary is a short description of the collection, generated by the LLM.
            This will replace the existing summary of the collection.
            If None or not provided, the summary will not be updated.
        mappings (dict): The mappings to update.
            The mappings are what the frontend will use to display the collection, and the associated fields.
            I.e., which fields correspond to which output fields on the frontend.
            The keys of the outer level of the dictionary are the mapping names, the values are dictionaries with the mappings.
            The inner dictionary has the keys as the collection fields, and the values as the frontend fields.
            If None or not provided, the mappings will not be updated.
        fields (list[dict]): The fields to update.
            Each element in the list is a dictionary with the following fields:
            - "name": The name of the field. (This is used to identify the field to change, the name will not change).
            - "description": The description of the field to update.
            Any fields that are not provided will not be updated.
            If None or not provided, the fields will not be updated.

    Returns:
        dict: The updated preprocessed collection.
    """

    if client_manager is None:
        client_manager = ClientManager()
        close_clients_after_completion = True
    else:
        close_clients_after_completion = False

    async with client_manager.connect_to_async_client() as client:
        metadata_name = f"ELYSIA_METADATA__"

        # check if the collection itself exists
        if not await client.collections.exists(collection_name):
            raise Exception(f"Collection {collection_name} does not exist")

        # check if the metadata collection exists
        if not await client.collections.exists(metadata_name):
            raise Exception(f"Metadata collection does not exist")

        else:
            metadata_collection = client.collections.get(metadata_name)
            metadata = await metadata_collection.query.fetch_objects(
                filters=Filter.by_property("name").equal(collection_name),
                limit=1,
            )
            uuid = metadata.objects[0].uuid
            properties: dict = metadata.objects[0].properties  # type: ignore

        # update the named vectors
        if named_vectors is not None:
            for named_vector in named_vectors:
                for property_named_vector in properties["named_vectors"]:
                    if property_named_vector["name"] == named_vector["name"]:

                        if named_vector["enabled"] is not None:
                            property_named_vector["enabled"] = named_vector["enabled"]

                        if named_vector["description"] is not None:
                            property_named_vector["description"] = named_vector[
                                "description"
                            ]

        # update the summary
        if summary is not None:
            properties["summary"] = summary

        # update the mappings
        if mappings is not None:
            if "table" in mappings:
                mappings["table"] = {
                    field["name"]: field["name"] for field in properties["fields"]
                }

            # format all mappings
            for mapping_type, mapping in mappings.items():
                if mapping_type == "table":
                    continue

                # check if the mapping_type is valid
                if mapping_type not in rt.types_dict:
                    raise ValueError(
                        f"Invalid mapping type: {mapping_type}. Valid mapping types are: {list(rt.types_dict.keys())}"
                    )

                # check if the mapping is valid
                for field_name, field_value in mapping.items():
                    if field_name not in rt.types_dict[mapping_type]:
                        raise ValueError(
                            f"Invalid field name: {field_name} for mapping type: {mapping_type}. "
                            f"Valid fields are: {list(rt.types_dict[mapping_type].keys())}"
                        )

                # add empty fields
                for true_field in rt.types_dict[mapping_type]:
                    if true_field not in mapping:
                        mapping[true_field] = ""

            properties["mappings"] = mappings

            # check if the `conversation_id` field is in the mapping (required for conversation type)
            if "conversation" in mappings and (
                mappings["conversation"]["conversation_id"] is None
                or mappings["conversation"]["conversation_id"] == ""
            ):
                raise ValueError(
                    "Conversation type requires a conversation_id field, but none was found in the mappings for conversation. "
                )

            if "conversation" in mappings and (
                mappings["conversation"]["message_id"] is None
                or mappings["conversation"]["message_id"] == ""
            ):
                raise ValueError(
                    "Conversation type requires a message_id field, but none was found in the mappings for conversation. "
                )

            # check if there is a message type as well as conversation
            if "message" not in mappings and "conversation" not in mappings:
                raise ValueError(
                    "Conversation type requires message type to also be set as a fallback."
                )

        # update the fields
        if fields is not None:
            for field in fields:
                for property_field in properties["fields"]:
                    if field is not None and property_field["name"] == field["name"]:
                        property_field["description"] = field["description"]

        format_dict_to_serialisable(properties)

        # update the collection
        await metadata_collection.data.update(uuid=uuid, properties=properties)

    if close_clients_after_completion:
        await client_manager.close_clients()

    return properties

preprocess(collection_names, client_manager=None, min_sample_size=5, max_sample_size=100, num_sample_tokens=30000, settings=environment_settings, force=False)

Preprocess a collection, obtain a LLM-generated summary of the collection, a set of statistics for each field (such as unique categories), and a set of mappings from the fields to the frontend-specific fields in Elysia.

In order: 1. Evaluate all the data fields and groups/statistics of the data fields as a whole 2. Write a summary of the collection via an LLM 3. Evaluate what return types are available for this collection 4. For each data field in the collection, evaluate what corresponding entry goes to what field in the return type (mapping) 5. Save as a ELYSIA_METADATA__ collection

Depending on the size of objects in the collection, you can choose the minimum and maximum sample size, which will be used to create a sample of objects for the LLM to create a collection summary. If your objects are particularly large, you can set the sample size to be smaller, to use less tokens and speed up the LLM processing. If your objects are small, you can set the sample size to be larger, to get a more accurate summary. This is a trade-off between speed/compute and accuracy.

But note that the pre-processing step only needs to be done once for each collection. The output of this function is cached, so that if you run it again, it will not re-process the collection (unless the force flag is set to True).

This function saves the output into a collection called ELYSIA_METADATA__, which is automatically called by Elysia. This is saved to whatever Weaviate cluster URL/API key you have configured, or in your environment variables. You can change this by setting the wcd_url and wcd_api_key in the settings, and pass this Settings object to this function.

Parameters:

Name Type Description Default
collection_names list[str]

The names of the collections to preprocess.

required
client_manager ClientManager

The client manager to use. If not provided, a new ClientManager will be created using the environment variables/configured settings.

None
min_sample_size int

The minimum number of objects to sample from the collection to evaluate the statistics/summary. Optional, defaults to 10.

5
max_sample_size int

The maximum number of objects to sample from the collection to evaluate the statistics/summary. Optional, defaults to 20.

100
num_sample_tokens int

The maximum number of tokens in the sample objects used to evaluate the summary. Optional, defaults to 30000.

30000
settings Settings

The settings to use. Optional, defaults to the environment variables/configured settings.

settings
force bool

Whether to force the preprocessor to run even if the collection already exists. Optional, defaults to False.

False
Source code in elysia/preprocess/collection.py
def preprocess(
    collection_names: list[str],
    client_manager: ClientManager | None = None,
    min_sample_size: int = 5,
    max_sample_size: int = 100,
    num_sample_tokens: int = 30000,
    settings: Settings = environment_settings,
    force: bool = False,
) -> None:
    """
    Preprocess a collection, obtain a LLM-generated summary of the collection,
    a set of statistics for each field (such as unique categories), and a set of mappings
    from the fields to the frontend-specific fields in Elysia.

    In order:
    1. Evaluate all the data fields and groups/statistics of the data fields as a whole
    2. Write a summary of the collection via an LLM
    3. Evaluate what return types are available for this collection
    4. For each data field in the collection, evaluate what corresponding entry goes to what field in the return type (mapping)
    5. Save as a ELYSIA_METADATA__ collection

    Depending on the size of objects in the collection, you can choose the minimum and maximum sample size,
    which will be used to create a sample of objects for the LLM to create a collection summary.
    If your objects are particularly large, you can set the sample size to be smaller, to use less tokens and speed up the LLM processing.
    If your objects are small, you can set the sample size to be larger, to get a more accurate summary.
    This is a trade-off between speed/compute and accuracy.

    But note that the pre-processing step only needs to be done once for each collection.
    The output of this function is cached, so that if you run it again, it will not re-process the collection (unless the force flag is set to True).

    This function saves the output into a collection called ELYSIA_METADATA__, which is automatically called by Elysia.
    This is saved to whatever Weaviate cluster URL/API key you have configured, or in your environment variables.
    You can change this by setting the `wcd_url` and `wcd_api_key` in the settings, and pass this Settings object to this function.

    Args:
        collection_names (list[str]): The names of the collections to preprocess.
        client_manager (ClientManager): The client manager to use.
            If not provided, a new ClientManager will be created using the environment variables/configured settings.
        min_sample_size (int): The minimum number of objects to sample from the collection to evaluate the statistics/summary. Optional, defaults to 10.
        max_sample_size (int): The maximum number of objects to sample from the collection to evaluate the statistics/summary. Optional, defaults to 20.
        num_sample_tokens (int): The maximum number of tokens in the sample objects used to evaluate the summary. Optional, defaults to 30000.
        settings (Settings): The settings to use. Optional, defaults to the environment variables/configured settings.
        force (bool): Whether to force the preprocessor to run even if the collection already exists. Optional, defaults to False.
    """

    asyncio_run(
        _preprocess_async(
            collection_names,
            client_manager,
            min_sample_size,
            max_sample_size,
            num_sample_tokens,
            settings,
            force,
        )
    )

preprocess_async(collection_name, client_manager=None, min_sample_size=10, max_sample_size=20, num_sample_tokens=30000, force=False, percentage_correct_threshold=0.3, settings=environment_settings) async

Preprocess a collection, obtain a LLM-generated summary of the collection, a set of statistics for each field (such as unique categories), and a set of mappings from the fields to the frontend-specific fields in Elysia.

In order:

  1. Evaluate all the data fields and groups/statistics of the data fields as a whole
  2. Write a summary of the collection via an LLM
  3. Evaluate what return types are available for this collection
  4. For each data field in the collection, evaluate what corresponding entry goes to what field in the return type (mapping)
  5. Save as a ELYSIA_METADATA__ collection

Parameters:

Name Type Description Default
collection_name str

The name of the collection to preprocess.

required
client_manager ClientManager

The client manager to use.

None
min_sample_size int

The minimum number of objects to sample from the collection to evaluate the statistics/summary. Optional, defaults to 10.

10
max_sample_size int

The maximum number of objects to sample from the collection to evaluate the statistics/summary. Optional, defaults to 20.

20
num_sample_tokens int

The number of tokens to approximately sample from the collection to evaluate the summary. The preprocessor will aim to use this many tokens in the sample objects to evaluate the summary. But will not exceed the maximum number of objects specified by max_sample_size, and always use at least min_sample_size objects.

30000
force bool

Whether to force the preprocessor to run even if the collection already exists. Optional, defaults to False.

False
threshold_for_missing_fields float

The threshold for the number of missing fields in the data mapping. Optional, defaults to 0.1.

required
settings Settings

The settings to use. Optional, defaults to the environment variables/configured settings.

settings

Returns:

Type Description
AsyncGenerator[dict, None]

AsyncGenerator[dict, None]: A generator that yields dictionaries with the status updates and progress of the preprocessor.

Source code in elysia/preprocess/collection.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
async def preprocess_async(
    collection_name: str,
    client_manager: ClientManager | None = None,
    min_sample_size: int = 10,
    max_sample_size: int = 20,
    num_sample_tokens: int = 30000,
    force: bool = False,
    percentage_correct_threshold: float = 0.3,
    settings: Settings = environment_settings,
) -> AsyncGenerator[dict, None]:
    """
    Preprocess a collection, obtain a LLM-generated summary of the collection,
    a set of statistics for each field (such as unique categories), and a set of mappings
    from the fields to the frontend-specific fields in Elysia.

    In order:

    1. Evaluate all the data fields and groups/statistics of the data fields as a whole
    2. Write a summary of the collection via an LLM
    3. Evaluate what return types are available for this collection
    4. For each data field in the collection, evaluate what corresponding entry goes to what field in the return type (mapping)
    5. Save as a ELYSIA_METADATA__ collection

    Args:
        collection_name (str): The name of the collection to preprocess.
        client_manager (ClientManager): The client manager to use.
        min_sample_size (int): The minimum number of objects to sample from the collection to evaluate the statistics/summary. Optional, defaults to 10.
        max_sample_size (int): The maximum number of objects to sample from the collection to evaluate the statistics/summary. Optional, defaults to 20.
        num_sample_tokens (int): The number of tokens to approximately sample from the collection to evaluate the summary.
            The preprocessor will aim to use this many tokens in the sample objects to evaluate the summary.
            But will not exceed the maximum number of objects specified by `max_sample_size`, and always use at least `min_sample_size` objects.
        force (bool): Whether to force the preprocessor to run even if the collection already exists. Optional, defaults to False.
        threshold_for_missing_fields (float): The threshold for the number of missing fields in the data mapping. Optional, defaults to 0.1.
        settings (Settings): The settings to use. Optional, defaults to the environment variables/configured settings.

    Returns:
        AsyncGenerator[dict, None]: A generator that yields dictionaries with the status updates and progress of the preprocessor.
    """

    collection_summariser_prompt = dspy.ChainOfThought(CollectionSummariserPrompt)
    return_type_prompt = dspy.ChainOfThought(ReturnTypePrompt)
    data_mapping_prompt = dspy.ChainOfThought(DataMappingPrompt)
    prompt_suggestor_prompt = dspy.ChainOfThought(PromptSuggestorPrompt)

    lm = load_base_lm(settings)
    logger = settings.logger
    process_update = ProcessUpdate(collection_name, len(rt.specific_return_types) + 5)

    if client_manager is None:
        client_manager = ClientManager(
            wcd_url=settings.WCD_URL, wcd_api_key=settings.WCD_API_KEY
        )
        close_clients_after_completion = True
    else:
        close_clients_after_completion = False

    try:
        # Check if the collection exists
        async with client_manager.connect_to_async_client() as client:
            if not await client.collections.exists(collection_name):
                raise Exception(f"Collection {collection_name} does not exist!")

        # Check if the preprocessed collection exists
        if (
            await preprocessed_collection_exists_async(collection_name, client_manager)
            and not force
        ):
            logger.info(f"Preprocessed metadata for {collection_name} already exists!")
            return

        # Get the collection and its properties
        async with client_manager.connect_to_async_client() as client:
            collection = client.collections.get(collection_name)
            properties = await async_get_collection_data_types(client, collection_name)

        # get number of items in collection
        agg = await collection.aggregate.over_all(total_count=True)
        len_collection: int = agg.total_count  # type: ignore

        # Randomly sample sample_size objects for the summary
        indices = random.sample(
            range(len_collection),
            max(min(max_sample_size, len_collection), 1),
        )

        # Get first object to estimate token count
        obj = await collection.query.fetch_objects(limit=1, offset=indices[0])
        token_count_0 = len(nlp(str(obj.objects[0].properties)))
        subset_objects: list[dict] = [obj.objects[0].properties]  # type: ignore

        # Get number of objects to sample to get close to num_sample_tokens
        num_sample_objects = max(min_sample_size, num_sample_tokens // token_count_0)

        for index in indices[1:num_sample_objects]:
            obj = await collection.query.fetch_objects(limit=1, offset=index)
            subset_objects.append(obj.objects[0].properties)  # type: ignore

        # Estimate number of tokens
        logger.debug(
            f"Estimated token count of sample: {token_count_0*len(subset_objects)}"
        )
        logger.debug(f"Number of objects in sample: {len(subset_objects)}")

        # Summarise the collection using LLM and the subset of the data
        summary, field_descriptions = await _summarise_collection(
            collection_summariser_prompt,
            properties,
            subset_objects,
            len_collection,
            settings,
            lm,
        )

        yield await process_update(
            message="Generated summary of collection",
        )

        if len_collection > max_sample_size:
            full_response = subset_objects
        else:
            weaviate_resp = await collection.query.fetch_objects(limit=len_collection)
            full_response = [obj.properties for obj in weaviate_resp.objects]

        # Initialise the output
        named_vectors, vectoriser = await _find_vectorisers(collection)
        out = {
            "name": collection_name,
            "length": len_collection,
            "summary": summary,
            "index_properties": await _evaluate_index_properties(collection),
            "named_vectors": named_vectors,
            "vectorizer": vectoriser,
            "fields": [],
            "mappings": {},
        }

        # Evaluate the summary statistics of each field
        for property in properties:
            out["fields"].append(
                await _evaluate_field_statistics(
                    collection, properties, property, len_collection, full_response
                )
            )
            if property in field_descriptions:
                out["fields"][-1]["description"] = field_descriptions[property]
            else:
                out["fields"][-1]["description"] = ""

        yield await process_update(
            message="Evaluated field statistics",
        )

        return_types = await _evaluate_return_types(
            return_type_prompt,
            summary,
            properties,
            subset_objects,
            settings,
            lm,
        )

        yield await process_update(
            message="Evaluated return types",
        )
        process_update.update_total(len(return_types) + 5)

        # suggest prompts
        out["prompts"] = await _suggest_prompts(
            prompt_suggestor_prompt,
            out,
            subset_objects,
            settings,
            lm,
        )

        yield await process_update(
            message="Created suggestions for prompts",
        )

        # For each return type created above, define the mappings from the properties to the frontend types
        mappings = {}
        for return_type in return_types:
            fields = rt.types_dict[return_type]

            mapping = await _define_mappings(
                data_mapping_prompt,
                mapping_type=return_type,
                input_fields=list(fields.keys()),
                output_fields=list(properties.keys()),
                properties=properties,
                collection_information=out,
                example_objects=subset_objects,
                settings=settings,
                lm=lm,
            )

            # remove any extra fields the model may have added
            mapping = {k: v for k, v in mapping.items() if k in list(fields.keys())}

            yield await process_update(
                message=f"Defined mappings for {return_type}",
            )

            mappings[return_type] = mapping

        new_return_types = []
        for return_type in return_types:

            # check if the `conversation_id` field is in the mapping (required for conversation type)
            if return_type == "conversation" and (
                (
                    mappings[return_type]["conversation_id"] is None
                    or mappings[return_type]["conversation_id"] == ""
                )
                or (
                    mappings[return_type]["message_id"] is None
                    or mappings[return_type]["message_id"] == ""
                )
            ):
                continue

            # If less than threshold_for_missing_fields%, keep the return type
            num_missing = sum([m == "" for m in list(mappings[return_type].values())])
            perc_correct = 1 - (num_missing / len(mappings[return_type].keys()))
            if perc_correct >= percentage_correct_threshold:
                new_return_types.append(return_type)

        # If no return types are left, fall-back to generic
        if len(new_return_types) == 0:

            # Map for generic
            mapping = await _define_mappings(
                data_mapping_prompt,
                mapping_type="generic",
                input_fields=list(rt.generic.keys()),
                output_fields=list(properties.keys()),
                properties=properties,
                collection_information=out,
                example_objects=subset_objects,
                settings=settings,
                lm=lm,
            )
            yield await process_update(
                message="No display types found, defined mappings for generic",
            )

            # remove any extra fields the model may have added
            mapping = {k: v for k, v in mapping.items() if k in list(rt.generic.keys())}
            mappings["generic"] = mapping

            # re-check the threshold for missing fields on generic
            num_missing = sum([m == "" for m in list(mappings[return_type].values())])
            perc_correct = 1 - (num_missing / len(mappings[return_type].keys()))
            if perc_correct >= percentage_correct_threshold:
                new_return_types = ["generic"]

        # Add the mappings to the output
        out["mappings"] = {
            return_type: mappings[return_type] for return_type in new_return_types
        }

        # always include the table return type
        out["mappings"]["table"] = {field: field for field in properties.keys()}

        # Delete existing metadata if it exists
        if await preprocessed_collection_exists_async(collection_name, client_manager):
            await delete_preprocessed_collection_async(collection_name, client_manager)

        # Save final metadata to a collection
        async with client_manager.connect_to_async_client() as client:
            if await client.collections.exists(f"ELYSIA_METADATA__"):
                metadata_collection = client.collections.get("ELYSIA_METADATA__")
            else:
                metadata_collection = await client.collections.create(
                    f"ELYSIA_METADATA__",
                    vectorizer_config=Configure.Vectorizer.none(),
                    properties=[
                        Property(
                            name="name",
                            data_type=DataType.TEXT,
                        ),
                        Property(
                            name="length",
                            data_type=DataType.NUMBER,
                        ),
                        Property(
                            name="summary",
                            data_type=DataType.TEXT,
                        ),
                        Property(
                            name="index_properties",
                            data_type=DataType.OBJECT,
                            nested_properties=[
                                Property(
                                    name="isNullIndexed",
                                    data_type=DataType.BOOL,
                                ),
                                Property(
                                    name="isLengthIndexed",
                                    data_type=DataType.BOOL,
                                ),
                                Property(
                                    name="isTimestampIndexed",
                                    data_type=DataType.BOOL,
                                ),
                            ],
                        ),
                        Property(
                            name="named_vectors",
                            data_type=DataType.OBJECT_ARRAY,
                            nested_properties=[
                                Property(
                                    name="name",
                                    data_type=DataType.TEXT,
                                ),
                                Property(
                                    name="vectorizer",
                                    data_type=DataType.TEXT,
                                ),
                                Property(
                                    name="model",
                                    data_type=DataType.TEXT,
                                ),
                                Property(
                                    name="source_properties",
                                    data_type=DataType.TEXT_ARRAY,
                                ),
                                Property(
                                    name="enabled",
                                    data_type=DataType.BOOL,
                                ),
                                Property(
                                    name="description",
                                    data_type=DataType.TEXT,
                                ),
                            ],
                        ),
                        Property(
                            name="vectorizer",
                            data_type=DataType.OBJECT,
                            nested_properties=[
                                Property(name="vectorizer", data_type=DataType.TEXT),
                                Property(name="model", data_type=DataType.TEXT),
                            ],
                        ),
                        Property(
                            name="fields",
                            data_type=DataType.OBJECT_ARRAY,
                            nested_properties=[
                                Property(
                                    name="name",
                                    data_type=DataType.TEXT,
                                ),
                                Property(
                                    name="type",
                                    data_type=DataType.TEXT,
                                ),
                                Property(
                                    name="description",
                                    data_type=DataType.TEXT,
                                ),
                                Property(
                                    name="range",
                                    data_type=DataType.NUMBER_ARRAY,
                                ),
                                Property(
                                    name="date_range",
                                    data_type=DataType.DATE_ARRAY,
                                ),
                                Property(
                                    name="groups",
                                    data_type=DataType.OBJECT_ARRAY,
                                    nested_properties=[
                                        Property(
                                            name="value",
                                            data_type=DataType.TEXT,
                                        ),
                                        Property(name="count", data_type=DataType.INT),
                                    ],
                                ),
                                Property(
                                    name="date_median",
                                    data_type=DataType.DATE,
                                ),
                                Property(
                                    name="mean",
                                    data_type=DataType.NUMBER,
                                ),
                            ],
                        ),
                        # leave mappings for auto-schema generation
                    ],
                    inverted_index_config=Configure.inverted_index(
                        index_null_state=True,
                    ),
                )
            await metadata_collection.data.insert(out)

        yield await process_update(
            completed=True,
            message="Saved metadata to Weaviate",
        )

    except Exception as e:
        yield await process_update(
            error=f"Error preprocessing collection: {str(e)}",
        )

    finally:
        if close_clients_after_completion:
            await client_manager.close_clients()

preprocessed_collection_exists(collection_name, client_manager=None)

Check if the preprocessed collection exists in the Weaviate cluster. This function simply checks if the cached preprocessed metadata exists in the Weaviate cluster. It does so by checking if the collection name exists in the ELYSIA_METADATA__ collection.

Parameters:

Name Type Description Default
collection_name str

The name of the collection to check.

required
client_manager ClientManager

The client manager to use.

None
Source code in elysia/preprocess/collection.py
def preprocessed_collection_exists(
    collection_name: str, client_manager: ClientManager | None = None
) -> bool:
    """
    Check if the preprocessed collection exists in the Weaviate cluster.
    This function simply checks if the cached preprocessed metadata exists in the Weaviate cluster.
    It does so by checking if the collection name exists in the ELYSIA_METADATA__ collection.

    Args:
        collection_name (str): The name of the collection to check.
        client_manager (ClientManager): The client manager to use.
    """
    return asyncio_run(
        preprocessed_collection_exists_async(collection_name, client_manager)
    )

preprocessed_collection_exists_async(collection_name, client_manager=None) async

Async version of preprocessed_collection_exists.

Parameters:

Name Type Description Default
collection_name str

The name of the collection to check.

required
client_manager ClientManager

The client manager to use. If not provided, a new ClientManager will be created using the environment variables/configured settings.

None

Returns:

Name Type Description
bool bool

True if the collection exists, False otherwise.

Source code in elysia/preprocess/collection.py
async def preprocessed_collection_exists_async(
    collection_name: str, client_manager: ClientManager | None = None
) -> bool:
    """
    Async version of `preprocessed_collection_exists`.

    Args:
        collection_name (str): The name of the collection to check.
        client_manager (ClientManager): The client manager to use.
            If not provided, a new ClientManager will be created using the environment variables/configured settings.

    Returns:
        bool: True if the collection exists, False otherwise.
    """
    if client_manager is None:
        client_manager = ClientManager()
        close_clients_after_completion = True
    else:
        close_clients_after_completion = False

    async with client_manager.connect_to_async_client() as client:
        metadata_exists = await client.collections.exists(f"ELYSIA_METADATA__")
        if not metadata_exists:
            return False

        metadata_collection = client.collections.get("ELYSIA_METADATA__")
        metadata = await metadata_collection.query.fetch_objects(
            filters=Filter.by_property("name").equal(collection_name)
        )

    if close_clients_after_completion:
        await client_manager.close_clients()

    return metadata is not None and len(metadata.objects) > 0

view_preprocessed_collection(collection_name, client_manager=None)

View a preprocessed collection. This function allows you to view the preprocessed collection generated by the preprocess function. It does so by querying the ELYSIA_METADATA__ collection.

Parameters:

Name Type Description Default
collection_name str

The name of the collection to view.

required
client_manager ClientManager

The client manager to use. If not provided, a new ClientManager will be created using the environment variables/configured settings.

None

Returns:

Name Type Description
dict dict

The preprocessed collection.

Source code in elysia/preprocess/collection.py
def view_preprocessed_collection(
    collection_name: str, client_manager: ClientManager | None = None
) -> dict:
    """
    View a preprocessed collection.
    This function allows you to view the preprocessed collection generated by the preprocess function.
    It does so by querying the ELYSIA_METADATA__ collection.

    Args:
        collection_name (str): The name of the collection to view.
        client_manager (ClientManager): The client manager to use.
            If not provided, a new ClientManager will be created using the environment variables/configured settings.

    Returns:
        dict: The preprocessed collection.
    """
    return asyncio_run(
        view_preprocessed_collection_async(collection_name, client_manager)
    )

view_preprocessed_collection_async(collection_name, client_manager=None) async

Async version of view_preprocessed_collection.

Parameters:

Name Type Description Default
collection_name str

The name of the collection to view.

required
client_manager ClientManager

The client manager to use. If not provided, a new ClientManager will be created using the environment variables/configured settings.

None

Returns:

Name Type Description
dict dict

The preprocessed collection.

Source code in elysia/preprocess/collection.py
async def view_preprocessed_collection_async(
    collection_name: str, client_manager: ClientManager | None = None
) -> dict:
    """
    Async version of `view_preprocessed_collection`.

    Args:
        collection_name (str): The name of the collection to view.
        client_manager (ClientManager): The client manager to use.
            If not provided, a new ClientManager will be created using the environment variables/configured settings.

    Returns:
        dict: The preprocessed collection.
    """

    if client_manager is None:
        client_manager = ClientManager()
        close_clients_after_completion = True
    else:
        close_clients_after_completion = False

    async with client_manager.connect_to_async_client() as client:
        metadata_name = f"ELYSIA_METADATA__"

        # check if the collection itself exists
        if not await client.collections.exists(collection_name):
            raise Exception(f"Collection {collection_name} does not exist")

        # check if the metadata collection exists
        if not await client.collections.exists(metadata_name):
            raise Exception(f"Metadata collection does not exist")

        else:
            metadata_collection = client.collections.get(metadata_name)
            metadata = await metadata_collection.query.fetch_objects(
                filters=Filter.by_property("name").equal(collection_name),
                limit=1,
            )
            if len(metadata.objects) == 0:
                raise Exception(f"Metadata for {collection_name} does not exist")

            properties: dict = metadata.objects[0].properties  # type: ignore

    if close_clients_after_completion:
        await client_manager.close_clients()

    return properties