Content-Length: 483132 | pFad | http://github.com/apache/airflow/pull/22852/commits/4bd14432d729f1938f9d24f6b23b0fbabd649ee9

D9 Add ability for Dataproc operator to create cluster in GKE cluster by MaksYermak · Pull Request #22852 · apache/airflow · GitHub
Skip to content

Add ability for Dataproc operator to create cluster in GKE cluster #22852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove run_in_gke_cluster flag
  • Loading branch information
MaksYermak committed Apr 20, 2022
commit 4bd14432d729f1938f9d24f6b23b0fbabd649ee9
28 changes: 10 additions & 18 deletions airflow/providers/google/cloud/hooks/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,8 @@ def create_cluster(
region: str,
project_id: str,
cluster_name: str,
cluster_config: Union[Dict, Cluster, None],
cluster_config: Union[Dict, Cluster, None] = None,
virtual_cluster_config: Optional[Dict] = None,
run_in_gke_cluster: Optional[bool] = False,
labels: Optional[Dict[str, str]] = None,
request_id: Optional[str] = None,
retry: Union[Retry, _MethodDefault] = DEFAULT,
Expand All @@ -317,8 +316,6 @@ def create_cluster(
cluster that does not directly control the underlying compute resources, for example, when
creating a `Dataproc-on-GKE cluster`
:class:`~google.cloud.dataproc_v1.types.VirtualClusterConfig`
:param run_in_gke_cluster: Optional. If true run in Google Kubernetes Engine cluster with virtual
cluster config
:param request_id: Optional. A unique id used to identify the request. If the server receives two
``CreateClusterRequest`` requests with the same id, then the second request will be ignored and
the first ``google.longrunning.Operation`` created and stored in the backend is returned.
Expand All @@ -334,20 +331,15 @@ def create_cluster(
labels = labels or {}
labels.update({'airflow-version': 'v' + airflow_version.replace('.', '-').replace('+', '-')})

cluster = (
{
"project_id": project_id,
"cluster_name": cluster_name,
"virtual_cluster_config": virtual_cluster_config,
}
if run_in_gke_cluster
else {
"project_id": project_id,
"cluster_name": cluster_name,
"config": cluster_config,
"labels": labels,
}
)
cluster = {
"project_id": project_id,
"cluster_name": cluster_name,
}
if virtual_cluster_config is not None:
cluster['virtual_cluster_config'] = virtual_cluster_config # type: ignore
if cluster_config is not None:
cluster['config'] = cluster_config # type: ignore
cluster['labels'] = labels # type: ignore

client = self.get_cluster_client(region=region)
result = client.create_cluster(
Expand Down
6 changes: 1 addition & 5 deletions airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ class DataprocCreateClusterOperator(BaseOperator):
cluster that does not directly control the underlying compute resources, for example, when creating a
`Dataproc-on-GKE cluster
<https://cloud.google.com/dataproc/docs/concepts/jobs/dataproc-gke#create-a-dataproc-on-gke-cluster>`
:param run_in_gke_cluster: If true run in Google Kubernetes Engine cluster with virtual cluster config
:param region: The specified region where the dataproc cluster is created.
:param delete_on_error: If true the cluster will be deleted if created with ERROR state. Default
value is true.
Expand Down Expand Up @@ -461,7 +460,6 @@ def __init__(
project_id: Optional[str] = None,
cluster_config: Optional[Union[Dict, Cluster]] = None,
virtual_cluster_config: Optional[Dict] = None,
run_in_gke_cluster: bool = False,
labels: Optional[Dict] = None,
request_id: Optional[str] = None,
delete_on_error: bool = True,
Expand All @@ -482,7 +480,7 @@ def __init__(
region = 'global'

# TODO: remove one day
if cluster_config is None and not run_in_gke_cluster:
if cluster_config is None and virtual_cluster_config is None:
warnings.warn(
f"Passing cluster parameters by keywords to `{type(self).__name__}` will be deprecated. "
"Please provide cluster_config object using `cluster_config` parameter. "
Expand Down Expand Up @@ -525,7 +523,6 @@ def __init__(
self.use_if_exists = use_if_exists
self.impersonation_chain = impersonation_chain
self.virtual_cluster_config = virtual_cluster_config
self.run_in_gke_cluster = run_in_gke_cluster

def _create_cluster(self, hook: DataprocHook):
operation = hook.create_cluster(
Expand All @@ -535,7 +532,6 @@ def _create_cluster(self, hook: DataprocHook):
labels=self.labels,
cluster_config=self.cluster_config,
virtual_cluster_config=self.virtual_cluster_config,
run_in_gke_cluster=self.run_in_gke_cluster,
request_id=self.request_id,
retry=self.retry,
timeout=self.timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ With this configuration we can create the cluster:
:start-after: [START how_to_cloud_dataproc_create_cluster_operator]
:end-before: [END how_to_cloud_dataproc_create_cluster_operator]

For create Dataproc cluster in Google Kubernetes Engine you should enable run_in_gke_cluster flag
and use this cluster configuration:
For create Dataproc cluster in Google Kubernetes Engine you should use this cluster configuration:

.. exampleinclude:: /../../tests/system/providers/google/dataproc/example_dataproc_gke.py
:language: python
Expand Down
4 changes: 0 additions & 4 deletions tests/providers/google/cloud/operators/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ def test_execute(self, mock_hook, to_dict_mock):
'metadata': METADATA,
'cluster_config': CONFIG,
'labels': LABELS,
'run_in_gke_cluster': False,
'virtual_cluster_config': None,
}
expected_calls = self.extra_links_expected_calls_base + [
Expand Down Expand Up @@ -489,7 +488,6 @@ def test_execute_in_gke(self, mock_hook, to_dict_mock):
'metadata': METADATA,
'cluster_config': None,
'labels': LABELS,
'run_in_gke_cluster': True,
'virtual_cluster_config': VIRTUAL_CLUSTER_CONFIG,
}
expected_calls = self.extra_links_expected_calls_base + [
Expand All @@ -502,7 +500,6 @@ def test_execute_in_gke(self, mock_hook, to_dict_mock):
labels=LABELS,
cluster_name=CLUSTER_NAME,
project_id=GCP_PROJECT,
run_in_gke_cluster=True,
virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
request_id=REQUEST_ID,
gcp_conn_id=GCP_CONN_ID,
Expand Down Expand Up @@ -556,7 +553,6 @@ def test_execute_if_cluster_exists(self, mock_hook, to_dict_mock):
retry=RETRY,
timeout=TIMEOUT,
metadata=METADATA,
run_in_gke_cluster=False,
virtual_cluster_config=None,
)
mock_hook.return_value.get_cluster.assert_called_once_with(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
run_in_gke_cluster=True,
virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
)
# [END how_to_cloud_dataproc_create_cluster_operator_in_gke]
Expand Down








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/airflow/pull/22852/commits/4bd14432d729f1938f9d24f6b23b0fbabd649ee9

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy