-
Notifications
You must be signed in to change notification settings - Fork 15.1k
Creating S3KeysUpsertedTrigger
to "Watch" an S3 Asset
#50791
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
cc: @cmarteepants, @RNHTTR |
I guess the best option might be to not |
cc @vincbeck, who might have some thoughts here |
The only viable option, I think, would be to record in the database when was the last event for a given trigger associated to a trigger (table This is definitely doable and not that hard I think. I do not have the bandwidth to do it now but happy to review :) |
We have to bear in mind that there isn't nessarily a perfect relationship between airflow's event timestamp and the external one. An obvious example is, s3 file lands with timestamp X. Airflow records the event timestamp 10 seconds after X. But between X and X + 10, more files land in the bucket. So if you take airflow's event timestamp as the high watermark, you'll miss files. So what we're really talking about here, is watermarking. In general, you need to track the actual timestamps from the external system when doing this kind of thing. |
I included a "safe" timestamp for this, this is most certainly top-of-mind. |
self.from_datetime = datetime.fromisoformat(from_datetime) \ | ||
if isinstance(from_datetime, str) \ | ||
else from_datetime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Supporting str seems like an unnecessary feature to me. Users can do it themselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably also call coerce_datetime
or something to ensure the value is an aware datetime. Otherwise this would fail with a somewhat cryptic error inside the hook.
*This description will be changed once it moved out of "Draft" state.
I'm building a Trigger that would be used to monitor a certain key in S3, waiting for that key to exist or be updated. However, I'm having difficulty persisting state between Trigger runs. Once a Trigger event(s) is yielded and the
run
method completes execution. Thelast_activity_time
timestamp is wiped. This means that my Trigger runs in a sort of "infinite loop".What is the best way for us to maintain state across "Asset Watches"? I think this is an inherent downside of using Trigger exclusively to build
AssetWatchers
, without the ability to "customize" that logic.EDIT:
I've removed the
return
, this solves the issue that I previously had, but prevents theTrigger
run
from completing.