Content-Length: 465305 | pFad | http://github.com/apache/airflow/pull/50735

2E Add delete_by_property method in weaviate hook by sjyangkevin · Pull Request #50735 · apache/airflow · GitHub
Skip to content

Add delete_by_property method in weaviate hook #50735

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

sjyangkevin
Copy link
Contributor

Motivation

By adding the delete_by_property method, users can delete multiple objects in multiple collections based on various filtering criteria, making data management more targeted and efficient.

Close #42565

Reference

https://weaviate.io/developers/weaviate/search/filters
https://weaviate.io/developers/weaviate/manage-data/delete#delete-multiple-objects

Testing

  1. Two unit tests are created to capture failure scenarios
  2. Various filtering criteria have been tested in Jupyter notebook environment to connect to Weaviate and delete objects in collection.
  3. Failure scenarios also tested in notebook environment.

Testing Code Sample

import os
os.environ["WEAVIATE_URL"] = "<REST_ENDPOINT>"
os.environ["WEAVIATE_API_KEY"] = "<API Key>"

import weaviate
from weaviate.classes.init import Auth
from weaviate.classes.query import Filter

weaviate_url = os.environ["WEAVIATE_URL"]
weaviate_api_key = os.environ["WEAVIATE_API_KEY"]

# Connect to Weaviate Cloud
client = weaviate.connect_to_weaviate_cloud(
    cluster_url=weaviate_url,
    auth_credentials=Auth.api_key(weaviate_api_key),
)

def delete_by_property(...): ...

# create collection and add data.
data_rows = [
    {"title": f"Object {i+1}"} for i in range(5)
]

collection = client.collections.get("collection_a")

with collection.batch.fixed_size(batch_size=200) as batch:
    for data_row in data_rows:
        batch.add_object(
            properties=data_row,
        )
        if batch.number_errors > 10:
            print("Batch import stopped due to excessive errors.")
            break

failed_objects = collection.batch.failed_objects
if failed_objects:
    print(f"Number of failed imports: {len(failed_objects)}")
    print(f"First failed object: {failed_objects[0]}")

# Execute the deletion
delete_by_property(
    collection_names="collection_a",
    filter_criteria=Filter.by_property("title").equal("Object 1"),
    if_error="continue"
)

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@sjyangkevin sjyangkevin force-pushed the issues/42565/add-delete-by-properties-to-weaviate-hook branch from a609c21 to f65174f Compare May 17, 2025 23:41
@sjyangkevin
Copy link
Contributor Author

sjyangkevin commented May 18, 2025

Add a DAG test, which delete the objects properly.

Test Scenarios

  • Create a collection
  • Add 5 objects with a single property "title", with value "Object n"
  • Add 2 objects with properties including "demo" and "album"
  • Delete by "title", using "contains_any" with value of "Object" -> which deletes all the objects has property of "title"
  • Delete by "album" and "demo", where "album" is "Album 3" -> delete the objects with property "album" of value "album 3"

Screenshot from 2025-05-17 20-23-22

@eladkal eladkal requested review from Lee-W and utkarsharma2 May 18, 2025 04:45
Copy link
Member

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, it looks good. left a few nits

@sjyangkevin sjyangkevin force-pushed the issues/42565/add-delete-by-properties-to-weaviate-hook branch from f65174f to 55772ae Compare May 20, 2025 02:09
@sjyangkevin sjyangkevin requested a review from Lee-W May 20, 2025 03:05
@sjyangkevin sjyangkevin force-pushed the issues/42565/add-delete-by-properties-to-weaviate-hook branch from 0217d64 to 51d15e3 Compare May 20, 2025 03:25
self.log.error(e)
failed_collection_list.append(collection_name)
elif if_error == "stop":
raise e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we guard other cases? (It normally won't happen, but could be a typo or so)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I am actually thinking about add one more generic Exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I feel this can result in the logic in the except block being duplicated. Not sure if there is a good way to implement it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really need elif if_error == "stop" here. we probably could try

else:
    raise e

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am facing a trade off here. I can define a function, just wondering if that is necessary, or there is a better way to keep code DRY.

Option 1

except (
    weaviate.exceptions.UnexpectedStatusCodeException,
    weaviate.exceptions.WeaviateDeleteManyError,
    Exception # capture generic exception, but the above two can also be captured by this, keeping for better readability
) as e:
    if if_error == "continue":
        self.log.error(e)
        failed_collection_list.append(collection_name)
    else:
        raise e

Option 2

except (
    weaviate.exceptions.UnexpectedStatusCodeException,
    weaviate.exceptions.WeaviateDeleteManyError
) as e:
    if if_error == "continue":
        self.log.error(e)
        failed_collection_list.append(collection_name)
    else:
        raise e
except Exception as e: # use another except block, but code don't follow DRY
   if if_error == "continue":
      self.log.error(e)
      failed_collection_list.append(collection_name)
    else:
        raise e

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any known exception other than those 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I haven’t observed any failure while running the DAG testing. Probably due to the number of objects are small if the Filter is defined proper. For the two exceptions, I found those from the client API document, one is related to the connection and one is related to the delete_many operation. 🤔 I think I can define some invalid filters to see what issue may be raised. Like filter on a property that doesn’t exist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. We should avoid using Exception and AirflowException unless necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I run multiple tests with invalid filter, and the WeaviateDeleteManyError is captured. Basically, in the method, there are two main operations. The first one is to get the collection.

Scenarios

  1. Collection not found: WeaviateDeleteManyError is captured, e.g. could not find class Collection_c in schema.
  2. Connection issue, I think this should be captured by UnexpectedStatusCodeException
  3. Invalid filter, WeaviateDeleteManyError is captured. e.g., no such prop with name 'label' found in class

Screenshot from 2025-05-22 01-12-44

@sjyangkevin sjyangkevin force-pushed the issues/42565/add-delete-by-properties-to-weaviate-hook branch from 51d15e3 to 55c377f Compare May 22, 2025 05:11
@sjyangkevin sjyangkevin requested a review from Lee-W May 22, 2025 05:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add delete_by_property method in weaviate hook
2 participants








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/airflow/pull/50735

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy