Content-Length: 1067646 | pFad | https://github.com/googleapis/python-dataproc/commit/0719d2b69661f9775c00a1fc0dade2e65b4e44e9

4D Fix: updating submit_job_to_cluster.py (#387) · googleapis/python-dataproc@0719d2b · GitHub
Skip to content
This repository was archived by the owner on Nov 29, 2023. It is now read-only.

Commit 0719d2b

Browse files
authored
Fix: updating submit_job_to_cluster.py (#387)
* Fix lint errors * Update submit_job_to_cluster.py
1 parent 1981336 commit 0719d2b

File tree

1 file changed

+83
-250
lines changed

1 file changed

+83
-250
lines changed
Lines changed: 83 additions & 250 deletions
Original file line numberDiff line numberDiff line change
@@ -1,307 +1,140 @@
11
#!/usr/bin/env python
2+
3+
# Copyright 2022 Google LLC
4+
#
25
# Licensed under the Apache License, Version 2.0 (the "License");
36
# you may not use this file except in compliance with the License.
47
# You may obtain a copy of the License at
58
#
6-
# http://www.apache.org/licenses/LICENSE-2.0
9+
# http://www.apache.org/licenses/LICENSE-2.0
710
#
811
# Unless required by applicable law or agreed to in writing, software
912
# distributed under the License is distributed on an "AS IS" BASIS,
1013
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1114
# See the License for the specific language governing permissions and
1215
# limitations under the License.
13-
r"""Sample command-line program to run a pyspark job on a new or existing
14-
cluster.
15-
16-
Global region clusters are supported with --global_region flag.
17-
18-
Example Usage to run the pyspark job on a new cluster:
19-
python submit_job_to_cluster.py --project_id=$PROJECT --gcs_bucket=$BUCKET \
20-
--create_new_cluster --cluster_name=$CLUSTER --zone=$ZONE
21-
22-
Example Usage to run the pyspark job on an existing global region cluster:
23-
python submit_job_to_cluster.py --project_id=$PROJECT --gcs_bucket=$BUCKET \
24-
--global_region --cluster_name=$CLUSTER --zone=$ZONE
2516

17+
# [START dataproc_quickstart]
2618
"""
19+
Command-line program to create a Dataproc cluster,
20+
run a PySpark job located in Cloud Storage on the cluster,
21+
then delete the cluster after the job completes.
2722
28-
from __future__ import absolute_import
29-
from __future__ import division
30-
from __future__ import print_function
23+
Usage:
24+
python submit_job_to_cluster --project_id <PROJECT_ID> --region <REGION> \
25+
--cluster_name <CLUSTER_NAME> --job_file_path <GCS_JOB_FILE_PATH>
26+
"""
3127

3228
import argparse
33-
import os
29+
import re
3430

3531
from google.cloud import dataproc_v1
3632
from google.cloud import storage
3733

3834

39-
DEFAULT_FILENAME = "pyspark_sort.py"
40-
waiting_callback = False
41-
42-
43-
def get_pyspark_file(pyspark_file=None):
44-
if pyspark_file:
45-
f = open(pyspark_file, "rb")
46-
return f, os.path.basename(pyspark_file)
47-
else:
48-
"""Gets the PySpark file from current directory."""
49-
current_dir = os.path.dirname(os.path.abspath(__file__))
50-
f = open(os.path.join(current_dir, DEFAULT_FILENAME), "rb")
51-
return f, DEFAULT_FILENAME
52-
53-
54-
def get_region_from_zone(zone):
55-
try:
56-
region_as_list = zone.split("-")[:-1]
57-
return "-".join(region_as_list)
58-
except (AttributeError, IndexError, ValueError):
59-
raise ValueError("Invalid zone provided, please check your input.")
60-
61-
62-
def upload_pyspark_file(project, bucket_name, filename, spark_file):
63-
"""Uploads the PySpark file in this directory to the configured input
64-
bucket."""
65-
print("Uploading pyspark file to Cloud Storage.")
66-
client = storage.Client(project=project)
67-
bucket = client.get_bucket(bucket_name)
68-
blob = bucket.blob(filename)
69-
blob.upload_from_file(spark_file)
70-
71-
72-
def download_output(project, cluster_id, output_bucket, job_id):
73-
"""Downloads the output file from Cloud Storage and returns it as a
74-
string."""
75-
print("Downloading output file.")
76-
client = storage.Client(project=project)
77-
bucket = client.get_bucket(output_bucket)
78-
output_blob = "google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000".format(
79-
cluster_id, job_id
35+
# [START dataproc_create_cluster]
36+
def quickstart(project_id, region, cluster_name, job_file_path):
37+
# Create the cluster client.
38+
cluster_client = dataproc_v1.ClusterControllerClient(
39+
client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}
8040
)
81-
return bucket.blob(output_blob).download_as_string()
82-
8341

84-
# [START dataproc_submit_job_create_cluster]
85-
def create_cluster(dataproc, project, zone, region, cluster_name):
86-
"""Create the cluster."""
87-
print("Creating cluster...")
88-
zone_uri = "https://www.googleapis.com/compute/v1/projects/{}/zones/{}".format(
89-
project, zone
90-
)
91-
cluster_data = {
92-
"project_id": project,
42+
# Create the cluster config.
43+
cluster = {
44+
"project_id": project_id,
9345
"cluster_name": cluster_name,
9446
"config": {
95-
"gce_cluster_config": {"zone_uri": zone_uri},
96-
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-1"},
97-
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-1"},
47+
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
48+
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
9849
},
9950
}
10051

101-
cluster = dataproc.create_cluster(
102-
request={"project_id": project, "region": region, "cluster": cluster_data}
52+
# Create the cluster.
53+
operation = cluster_client.create_cluster(
54+
request={"project_id": project_id, "region": region, "cluster": cluster}
10355
)
104-
cluster.add_done_callback(callback)
105-
global waiting_callback
106-
waiting_callback = True
107-
108-
109-
# [END dataproc_submit_job_create_cluster]
110-
111-
112-
def callback(operation_future):
113-
# Reset global when callback returns.
114-
global waiting_callback
115-
waiting_callback = False
116-
117-
118-
def wait_for_cluster_creation():
119-
"""Wait for cluster creation."""
120-
print("Waiting for cluster creation...")
121-
122-
while True:
123-
if not waiting_callback:
124-
print("Cluster created.")
125-
break
126-
127-
128-
# [START dataproc_list_clusters_with_detail]
129-
def list_clusters_with_details(dataproc, project, region):
130-
"""List the details of clusters in the region."""
131-
for cluster in dataproc.list_clusters(
132-
request={"project_id": project, "region": region}
133-
):
134-
print(("{} - {}".format(cluster.cluster_name, cluster.status.state.name,)))
135-
56+
result = operation.result()
13657

137-
# [END dataproc_list_clusters_with_detail]
58+
print("Cluster created successfully: {}".format(result.cluster_name))
13859

60+
# [END dataproc_create_cluster]
13961

140-
def get_cluster_id_by_name(dataproc, project_id, region, cluster_name):
141-
"""Helper function to retrieve the ID and output bucket of a cluster by
142-
name."""
143-
for cluster in dataproc.list_clusters(
144-
request={"project_id": project_id, "region": region}
145-
):
146-
if cluster.cluster_name == cluster_name:
147-
return cluster.cluster_uuid, cluster.config.config_bucket
148-
62+
# [START dataproc_submit_job]
63+
# Create the job client.
64+
job_client = dataproc_v1.JobControllerClient(
65+
client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}
66+
)
14967

150-
# [START dataproc_submit_pyspark_job]
151-
def submit_pyspark_job(dataproc, project, region, cluster_name, bucket_name, filename):
152-
"""Submit the Pyspark job to the cluster (assumes `filename` was uploaded
153-
to `bucket_name."""
154-
job_details = {
68+
# Create the job config.
69+
job = {
15570
"placement": {"cluster_name": cluster_name},
156-
"pyspark_job": {
157-
"main_python_file_uri": "gs://{}/{}".format(bucket_name, filename)
158-
},
71+
"pyspark_job": {"main_python_file_uri": job_file_path},
15972
}
16073

161-
result = dataproc.submit_job(
162-
request={"project_id": project, "region": region, "job": job_details}
74+
operation = job_client.submit_job_as_operation(
75+
request={"project_id": project_id, "region": region, "job": job}
16376
)
164-
job_id = result.reference.job_id
165-
print("Submitted job ID {}.".format(job_id))
166-
return job_id
167-
77+
response = operation.result()
16878

169-
# [END dataproc_submit_pyspark_job]
79+
# Dataproc job output is saved to the Cloud Storage bucket
80+
# allocated to the job. Use regex to obtain the bucket and blob info.
81+
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
17082

171-
172-
# [START dataproc_delete]
173-
def delete_cluster(dataproc, project, region, cluster):
174-
"""Delete the cluster."""
175-
print("Tearing down cluster.")
176-
result = dataproc.delete_cluster(
177-
request={"project_id": project, "region": region, "cluster_name": cluster}
83+
output = (
84+
storage.Client()
85+
.get_bucket(matches.group(1))
86+
.blob(f"{matches.group(2)}.000000000")
87+
.download_as_string()
17888
)
179-
return result
180-
181-
182-
# [END dataproc_delete]
183-
184-
185-
# [START dataproc_wait]
186-
def wait_for_job(dataproc, project, region, job_id):
187-
"""Wait for job to complete or error out."""
188-
print("Waiting for job to finish...")
189-
while True:
190-
job = dataproc.get_job(
191-
request={"project_id": project, "region": region, "job_id": job_id}
192-
)
193-
# Handle exceptions
194-
if job.status.State(job.status.state).name == "ERROR":
195-
raise Exception(job.status.details)
196-
if job.status.State(job.status.state).name == "DONE":
197-
print("Job finished.")
198-
return job
199-
200-
201-
# [END dataproc_wait]
202-
203-
204-
def main(
205-
project_id,
206-
zone,
207-
cluster_name,
208-
bucket_name,
209-
pyspark_file=None,
210-
create_new_cluster=True,
211-
global_region=True,
212-
):
21389

214-
# [START dataproc_get_client]
215-
if global_region:
216-
region = "global"
217-
# Use the default gRPC global endpoints.
218-
dataproc_cluster_client = dataproc_v1.ClusterControllerClient()
219-
dataproc_job_client = dataproc_v1.JobControllerClient()
220-
else:
221-
region = get_region_from_zone(zone)
222-
# Use a regional gRPC endpoint. See:
223-
# https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
224-
dataproc_cluster_client = dataproc_v1.ClusterControllerClient(
225-
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
226-
)
227-
dataproc_job_client = dataproc_v1.ClusterControllerClient(
228-
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
229-
)
230-
# [END dataproc_get_client]
231-
232-
try:
233-
spark_file, spark_filename = get_pyspark_file(pyspark_file)
234-
if create_new_cluster:
235-
create_cluster(
236-
dataproc_cluster_client, project_id, zone, region, cluster_name
237-
)
238-
wait_for_cluster_creation()
239-
upload_pyspark_file(project_id, bucket_name, spark_filename, spark_file)
240-
241-
list_clusters_with_details(dataproc_cluster_client, project_id, region)
242-
243-
(cluster_id, output_bucket) = get_cluster_id_by_name(
244-
dataproc_cluster_client, project_id, region, cluster_name
245-
)
246-
247-
# [START dataproc_call_submit_pyspark_job]
248-
job_id = submit_pyspark_job(
249-
dataproc_job_client,
250-
project_id,
251-
region,
252-
cluster_name,
253-
bucket_name,
254-
spark_filename,
255-
)
256-
# [END dataproc_call_submit_pyspark_job]
90+
print(f"Job finished successfully: {output}\r\n")
91+
# [END dataproc_submit_job]
92+
93+
# [START dataproc_delete_cluster]
94+
# Delete the cluster once the job has terminated.
95+
operation = cluster_client.delete_cluster(
96+
request={
97+
"project_id": project_id,
98+
"region": region,
99+
"cluster_name": cluster_name,
100+
}
101+
)
102+
operation.result()
257103

258-
wait_for_job(dataproc_job_client, project_id, region, job_id)
259-
output = download_output(project_id, cluster_id, output_bucket, job_id)
260-
print("Received job output {}".format(output))
261-
return output
262-
finally:
263-
if create_new_cluster:
264-
delete_cluster(dataproc_cluster_client, project_id, region, cluster_name)
265-
spark_file.close()
104+
print("Cluster {} successfully deleted.".format(cluster_name))
105+
# [END dataproc_delete_cluster]
266106

267107

268108
if __name__ == "__main__":
269109
parser = argparse.ArgumentParser(
270-
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
271-
)
272-
parser.add_argument(
273-
"--project_id", help="Project ID you want to access.", required=True
274-
)
275-
parser.add_argument(
276-
"--zone", help="Zone to create clusters in/connect to", required=True
110+
description=__doc__,
111+
formatter_class=argparse.RawDescriptionHelpFormatter,
277112
)
278113
parser.add_argument(
279-
"--cluster_name", help="Name of the cluster to create/connect to", required=True
114+
"--project_id",
115+
type=str,
116+
required=True,
117+
help="Project to use for creating resources.",
280118
)
281119
parser.add_argument(
282-
"--gcs_bucket", help="Bucket to upload Pyspark file to", required=True
120+
"--region",
121+
type=str,
122+
required=True,
123+
help="Region where the resources should live.",
283124
)
284125
parser.add_argument(
285-
"--pyspark_file", help="Pyspark filename. Defaults to pyspark_sort.py"
126+
"--cluster_name",
127+
type=str,
128+
required=True,
129+
help="Name to use for creating a cluster.",
286130
)
287131
parser.add_argument(
288-
"--create_new_cluster",
289-
action="store_true",
290-
help="States if the cluster should be created",
291-
)
292-
parser.add_argument(
293-
"--global_region",
294-
action="store_true",
295-
help="If cluster is in the global region",
132+
"--job_file_path",
133+
type=str,
134+
required=True,
135+
help="Job in Cloud Storage to run on the cluster.",
296136
)
297137

298138
args = parser.parse_args()
299-
main(
300-
args.project_id,
301-
args.zone,
302-
args.cluster_name,
303-
args.gcs_bucket,
304-
args.pyspark_file,
305-
args.create_new_cluster,
306-
args.global_region,
307-
)
139+
quickstart(args.project_id, args.region, args.cluster_name, args.job_file_path)
140+
# [END dataproc_quickstart]

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/googleapis/python-dataproc/commit/0719d2b69661f9775c00a1fc0dade2e65b4e44e9

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy