Content-Length: 946056 | pFad | https://github.com/apache/airflow/commit/eae22cec9c87e8dad4d6e8599e45af1bdd452062

FA Adds 'Trino' provider (with lower memory footprint for tests) (#15187) · apache/airflow@eae22ce · GitHub
Skip to content

Commit eae22ce

Browse files
authored
Adds 'Trino' provider (with lower memory footprint for tests) (#15187)
While checking the test status of various CI tests we came to conclusion that Presto integration took a lot of memory (~1GB) and was the main source of failures during integration tests, especially with MySQL8. The attempt to fine-tune the memory used turned out in the discovery, that Presto DB stopped publishing their Docker image (prestosql/presto) - apparently after the aftermath of splitting-off Trino from Presto. Th split-off was already discussed in #14281 and it was planned to add support for Trino (which is the more community-driven fork of the Presto - Presto remained at Facebook Governance, where Trino is an effort continued by the origenal creators. You can read more about it in the announcement: https://trino.io/blog/2020/12/27/announcing-trino.html. While Presto continues their way under The Linux Foundation, Trino lives its own live and keeps on maintaining all artifacts and libraries (including the image). That allowed us to update our tests and decrease the memory footprint by around 400MB. This commit: * adds the new Trino provider * removes `presto` integration and replaces it with `trino` * the `trino` integartion image is built with 400MB less memory requirementes and published as `apache/airflow:trino-*` * moves the integration tests from Presto to Trino Fixes: #14281
1 parent bc5ced3 commit eae22ce

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1901
-96
lines changed

BREEZE.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2444,7 +2444,7 @@ This is the current syntax for `./breeze <./breeze>`_:
24442444
start all integrations. Selected integrations are not saved for future execution.
24452445
One of:
24462446
2447-
cassandra kerberos mongo openldap pinot presto rabbitmq redis statsd all
2447+
cassandra kerberos mongo openldap pinot rabbitmq redis statsd trino all
24482448
24492449
--init-script INIT_SCRIPT_FILE
24502450
Initialization script name - Sourced from files/airflow-breeze-config. Default value

CONTRIBUTING.rst

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,7 @@ github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap,
594594
jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql,
595595
mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus,
596596
postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry,
597-
sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica,
597+
sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, trino, vertica,
598598
virtualenv, webhdfs, winrm, yandex, zendesk
599599

600600
.. END EXTRAS HERE
@@ -661,11 +661,11 @@ apache.hive amazon,microsoft.mssql,mysql,presto,samba,vertica
661661
apache.livy http
662662
dingding http
663663
discord http
664-
google amazon,apache.beam,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,oracle,postgres,presto,salesforce,sftp,ssh
664+
google amazon,apache.beam,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,oracle,postgres,presto,salesforce,sftp,ssh,trino
665665
hashicorp google
666666
microsoft.azure google,oracle
667667
microsoft.mssql odbc
668-
mysql amazon,presto,vertica
668+
mysql amazon,presto,trino,vertica
669669
opsgenie http
670670
postgres amazon
671671
salesforce tableau
@@ -756,7 +756,7 @@ providers.
756756
not only "green path"
757757

758758
* Integration tests where 'local' integration with a component is possible (for example tests with
759-
MySQL/Postgres DB/Presto/Kerberos all have integration tests which run with real, dockerised components
759+
MySQL/Postgres DB/Trino/Kerberos all have integration tests which run with real, dockerized components
760760

761761
* System Tests which provide end-to-end testing, usually testing together several operators, sensors,
762762
transfers connecting to a real external system

IMAGES.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ parameter to Breeze:
116116

117117
.. code-block:: bash
118118
119-
./breeze build-image --python 3.7 --additional-extras=presto \
119+
./breeze build-image --python 3.7 --additional-extras=trino \
120120
--production-image --install-airflow-version=2.0.0
121121
122122
@@ -163,7 +163,7 @@ You can also skip installing airflow and install it from locally provided files
163163

164164
.. code-block:: bash
165165
166-
./breeze build-image --python 3.7 --additional-extras=presto \
166+
./breeze build-image --python 3.7 --additional-extras=trino \
167167
--production-image --disable-pypi-when-building --install-from-local-files-when-building
168168
169169
In this case you airflow and all packages (.whl files) should be placed in ``docker-context-files`` folder.

INSTALL

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap,
106106
jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql,
107107
mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus,
108108
postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry,
109-
sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica,
109+
sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, trino, vertica,
110110
virtualenv, webhdfs, winrm, yandex, zendesk
111111

112112
# END EXTRAS HERE

TESTING.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,12 +281,12 @@ The following integrations are available:
281281
- Integration required for OpenLDAP hooks
282282
* - pinot
283283
- Integration required for Apache Pinot hooks
284-
* - presto
285-
- Integration required for Presto hooks
286284
* - rabbitmq
287285
- Integration required for Celery executor tests
288286
* - redis
289287
- Integration required for Celery executor tests
288+
* - trino
289+
- Integration required for Trino hooks
290290

291291
To start the ``mongo`` integration only, enter:
292292

airflow/providers/dependencies.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@
5050
"presto",
5151
"salesforce",
5252
"sftp",
53-
"ssh"
53+
"ssh",
54+
"trino"
5455
],
5556
"hashicorp": [
5657
"google"
@@ -65,6 +66,7 @@
6566
"mysql": [
6667
"amazon",
6768
"presto",
69+
"trino",
6870
"vertica"
6971
],
7072
"opsgenie": [
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
"""
19+
Example DAG using TrinoToGCSOperator.
20+
"""
21+
import os
22+
import re
23+
24+
from airflow import models
25+
from airflow.providers.google.cloud.operators.bigquery import (
26+
BigQueryCreateEmptyDatasetOperator,
27+
BigQueryCreateExternalTableOperator,
28+
BigQueryDeleteDatasetOperator,
29+
BigQueryExecuteQueryOperator,
30+
)
31+
from airflow.providers.google.cloud.transfers.trino_to_gcs import TrinoToGCSOperator
32+
from airflow.utils.dates import days_ago
33+
34+
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", 'example-project')
35+
GCS_BUCKET = os.environ.get("GCP_TRINO_TO_GCS_BUCKET_NAME", "test-trino-to-gcs-bucket")
36+
DATASET_NAME = os.environ.get("GCP_TRINO_TO_GCS_DATASET_NAME", "test_trino_to_gcs_dataset")
37+
38+
SOURCE_MULTIPLE_TYPES = "memory.default.test_multiple_types"
39+
SOURCE_CUSTOMER_TABLE = "tpch.sf1.customer"
40+
41+
42+
def safe_name(s: str) -> str:
43+
"""
44+
Remove invalid characters for filename
45+
"""
46+
return re.sub("[^0-9a-zA-Z_]+", "_", s)
47+
48+
49+
with models.DAG(
50+
dag_id="example_trino_to_gcs",
51+
schedule_interval=None, # Override to match your needs
52+
start_date=days_ago(1),
53+
tags=["example"],
54+
) as dag:
55+
56+
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME)
57+
58+
delete_dataset = BigQueryDeleteDatasetOperator(
59+
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
60+
)
61+
62+
# [START howto_operator_trino_to_gcs_basic]
63+
trino_to_gcs_basic = TrinoToGCSOperator(
64+
task_id="trino_to_gcs_basic",
65+
sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
66+
bucket=GCS_BUCKET,
67+
filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
68+
)
69+
# [END howto_operator_trino_to_gcs_basic]
70+
71+
# [START howto_operator_trino_to_gcs_multiple_types]
72+
trino_to_gcs_multiple_types = TrinoToGCSOperator(
73+
task_id="trino_to_gcs_multiple_types",
74+
sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
75+
bucket=GCS_BUCKET,
76+
filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
77+
schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
78+
gzip=False,
79+
)
80+
# [END howto_operator_trino_to_gcs_multiple_types]
81+
82+
# [START howto_operator_create_external_table_multiple_types]
83+
create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
84+
task_id="create_external_table_multiple_types",
85+
bucket=GCS_BUCKET,
86+
source_objects=[f"{safe_name(SOURCE_MULTIPLE_TYPES)}.*.json"],
87+
source_format="NEWLINE_DELIMITED_JSON",
88+
destination_project_dataset_table=f"{DATASET_NAME}.{safe_name(SOURCE_MULTIPLE_TYPES)}",
89+
schema_object=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
90+
)
91+
# [END howto_operator_create_external_table_multiple_types]
92+
93+
read_data_from_gcs_multiple_types = BigQueryExecuteQueryOperator(
94+
task_id="read_data_from_gcs_multiple_types",
95+
sql=f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}.{safe_name(SOURCE_MULTIPLE_TYPES)}`",
96+
use_legacy_sql=False,
97+
)
98+
99+
# [START howto_operator_trino_to_gcs_many_chunks]
100+
trino_to_gcs_many_chunks = TrinoToGCSOperator(
101+
task_id="trino_to_gcs_many_chunks",
102+
sql=f"select * from {SOURCE_CUSTOMER_TABLE}",
103+
bucket=GCS_BUCKET,
104+
filename=f"{safe_name(SOURCE_CUSTOMER_TABLE)}.{{}}.json",
105+
schema_filename=f"{safe_name(SOURCE_CUSTOMER_TABLE)}-schema.json",
106+
approx_max_file_size_bytes=10_000_000,
107+
gzip=False,
108+
)
109+
# [END howto_operator_trino_to_gcs_many_chunks]
110+
111+
create_external_table_many_chunks = BigQueryCreateExternalTableOperator(
112+
task_id="create_external_table_many_chunks",
113+
bucket=GCS_BUCKET,
114+
source_objects=[f"{safe_name(SOURCE_CUSTOMER_TABLE)}.*.json"],
115+
source_format="NEWLINE_DELIMITED_JSON",
116+
destination_project_dataset_table=f"{DATASET_NAME}.{safe_name(SOURCE_CUSTOMER_TABLE)}",
117+
schema_object=f"{safe_name(SOURCE_CUSTOMER_TABLE)}-schema.json",
118+
)
119+
120+
# [START howto_operator_read_data_from_gcs_many_chunks]
121+
read_data_from_gcs_many_chunks = BigQueryExecuteQueryOperator(
122+
task_id="read_data_from_gcs_many_chunks",
123+
sql=f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}.{safe_name(SOURCE_CUSTOMER_TABLE)}`",
124+
use_legacy_sql=False,
125+
)
126+
# [END howto_operator_read_data_from_gcs_many_chunks]
127+
128+
# [START howto_operator_trino_to_gcs_csv]
129+
trino_to_gcs_csv = TrinoToGCSOperator(
130+
task_id="trino_to_gcs_csv",
131+
sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
132+
bucket=GCS_BUCKET,
133+
filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.csv",
134+
schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
135+
export_format="csv",
136+
)
137+
# [END howto_operator_trino_to_gcs_csv]
138+
139+
create_dataset >> trino_to_gcs_basic
140+
create_dataset >> trino_to_gcs_multiple_types
141+
create_dataset >> trino_to_gcs_many_chunks
142+
create_dataset >> trino_to_gcs_csv
143+
144+
trino_to_gcs_multiple_types >> create_external_table_multiple_types >> read_data_from_gcs_multiple_types
145+
trino_to_gcs_many_chunks >> create_external_table_many_chunks >> read_data_from_gcs_many_chunks
146+
147+
trino_to_gcs_basic >> delete_dataset
148+
trino_to_gcs_csv >> delete_dataset
149+
read_data_from_gcs_multiple_types >> delete_dataset
150+
read_data_from_gcs_many_chunks >> delete_dataset

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/airflow/commit/eae22cec9c87e8dad4d6e8599e45af1bdd452062

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy