2626"""
2727
2828import argparse
29+ import os
2930import re
3031
3132from google.cloud import dataproc_v1
3233from google.cloud import storage
3334
35+ DEFAULT_FILENAME = "pyspark_sort.py"
36+ waiting_callback = False
37+
38+
39+ def get_pyspark_file(pyspark_file=None):
40+ if pyspark_file:
41+ f = open(pyspark_file, "rb")
42+ return f, os.path.basename(pyspark_file)
43+ else:
44+ """Gets the PySpark file from current directory."""
45+ current_dir = os.path.dirname(os.path.abspath(__file__))
46+ f = open(os.path.join(current_dir, DEFAULT_FILENAME), "rb")
47+ return f, DEFAULT_FILENAME
48+
49+
50+ def get_region_from_zone(zone):
51+ try:
52+ region_as_list = zone.split("-")[:-1]
53+ return "-".join(region_as_list)
54+ except (AttributeError, IndexError, ValueError):
55+ raise ValueError("Invalid zone provided, please check your input.")
56+
57+
58+ def upload_pyspark_file(project, bucket_name, filename, spark_file):
59+ """Uploads the PySpark file in this directory to the configured input
60+ bucket."""
61+ print("Uploading pyspark file to Cloud Storage.")
62+ client = storage.Client(project=project)
63+ bucket = client.get_bucket(bucket_name)
64+ blob = bucket.blob(filename)
65+ blob.upload_from_file(spark_file)
66+
67+
68+ def download_output(project, cluster_id, output_bucket, job_id):
69+ """Downloads the output file from Cloud Storage and returns it as a
70+ string."""
71+ print("Downloading output file.")
72+ client = storage.Client(project=project)
73+ bucket = client.get_bucket(output_bucket)
74+ output_blob = "google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000".format(
75+ cluster_id, job_id
76+ )
77+ return bucket.blob(output_blob).download_as_string()
78+
3479
3580# [START dataproc_create_cluster]
36- def quickstart(project_id, region, cluster_name, job_file_path ):
81+ def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file ):
3782 # Create the cluster client.
3883 cluster_client = dataproc_v1.ClusterControllerClient(
3984 client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}
@@ -59,6 +104,9 @@ def quickstart(project_id, region, cluster_name, job_file_path):
59104
60105# [END dataproc_create_cluster]
61106
107+ spark_file, spark_filename = get_pyspark_file(pyspark_file)
108+ upload_pyspark_file(project_id, gcs_bucket, spark_filename, spark_file)
109+
62110# [START dataproc_submit_job]
63111 # Create the job client.
64112 job_client = dataproc_v1.JobControllerClient(
@@ -68,7 +116,7 @@ def quickstart(project_id, region, cluster_name, job_file_path):
68116 # Create the job config.
69117 job = {
70118 "placement": {"cluster_name": cluster_name},
71- "pyspark_job": {"main_python_file_uri": job_file_path },
119+ "pyspark_job": {"main_python_file_uri": "gs://{}/{}".format(gcs_bucket, spark_filename) },
72120 }
73121
74122 operation = job_client.submit_job_as_operation(
@@ -128,13 +176,15 @@ def quickstart(project_id, region, cluster_name, job_file_path):
128176 required=True,
129177 help="Name to use for creating a cluster.",
130178 )
179+
131180 parser.add_argument(
132- "--job_file_path",
133- type=str,
134- required=True,
135- help="Job in Cloud Storage to run on the cluster.",
181+ "--gcs_bucket", help="Bucket to upload Pyspark file to", required=True
182+ )
183+
184+ parser.add_argument(
185+ "--pyspark_file", help="Pyspark filename. Defaults to pyspark_sort.py"
136186 )
137187
138188 args = parser.parse_args()
139- quickstart(args.project_id, args.region, args.cluster_name, args.job_file_path )
189+ quickstart(args.project_id, args.region, args.cluster_name, args.gcs_bucket, args.pyspark_file )
140190# [END dataproc_quickstart]
0 commit comments