Content-Length: 355065 | pFad | http://github.com/apache/airflow/pull/50791

91 Creating `S3KeysUpsertedTrigger` to "Watch" an S3 Asset by jroachgolf84 · Pull Request #50791 · apache/airflow · GitHub
Skip to content

Creating S3KeysUpsertedTrigger to "Watch" an S3 Asset #50791

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

jroachgolf84
Copy link
Contributor

@jroachgolf84 jroachgolf84 commented May 19, 2025

*This description will be changed once it moved out of "Draft" state.

I'm building a Trigger that would be used to monitor a certain key in S3, waiting for that key to exist or be updated. However, I'm having difficulty persisting state between Trigger runs. Once a Trigger event(s) is yielded and the run method completes execution. The last_activity_time timestamp is wiped. This means that my Trigger runs in a sort of "infinite loop".

What is the best way for us to maintain state across "Asset Watches"? I think this is an inherent downside of using Trigger exclusively to build AssetWatchers, without the ability to "customize" that logic.

EDIT:

I've removed the return, this solves the issue that I previously had, but prevents the Trigger run from completing.

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels May 19, 2025
@jroachgolf84
Copy link
Contributor Author

cc: @cmarteepants, @RNHTTR

@jroachgolf84
Copy link
Contributor Author

I guess the best option might be to not return?

@jedcunningham
Copy link
Member

cc @vincbeck, who might have some thoughts here

@vincbeck
Copy link
Contributor

The only viable option, I think, would be to record in the database when was the last event for a given trigger associated to a trigger (table asset_trigger). We could pass that value down to the trigger which then can use it however it wants in its implementation. I say "viable option" because there can be multiple triggerers in an environment, so the only central place to record such things is the database. Any attempt to save a state in the triggerer would fail as soon as 2 (or more) are used.

This is definitely doable and not that hard I think. I do not have the bandwidth to do it now but happy to review :)

@dstandish
Copy link
Contributor

The only viable option, I think, would be to record in the database when was the last event for a given trigger associated to a trigger (table asset_trigger). We could pass that value down to the trigger which then can use it however it wants in its implementation. I say "viable option" because there can be multiple triggerers in an environment, so the only central place to record such things is the database. Any attempt to save a state in the triggerer would fail as soon as 2 (or more) are used.

This is definitely doable and not that hard I think. I do not have the bandwidth to do it now but happy to review :)

We have to bear in mind that there isn't nessarily a perfect relationship between airflow's event timestamp and the external one.

An obvious example is, s3 file lands with timestamp X. Airflow records the event timestamp 10 seconds after X. But between X and X + 10, more files land in the bucket. So if you take airflow's event timestamp as the high watermark, you'll miss files.

So what we're really talking about here, is watermarking. In general, you need to track the actual timestamps from the external system when doing this kind of thing.

@jroachgolf84
Copy link
Contributor Author

An obvious example is, s3 file lands with timestamp X. Airflow records the event timestamp 10 seconds after X. But between X and X + 10, more files land in the bucket. So if you take airflow's event timestamp as the high watermark, you'll miss files.

I included a "safe" timestamp for this, this is most certainly top-of-mind.

Comment on lines +160 to +162
self.from_datetime = datetime.fromisoformat(from_datetime) \
if isinstance(from_datetime, str) \
else from_datetime
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supporting str seems like an unnecessary feature to me. Users can do it themselves.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably also call coerce_datetime or something to ensure the value is an aware datetime. Otherwise this would fail with a somewhat cryptic error inside the hook.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:amazon AWS/Amazon - related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/airflow/pull/50791

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy