Skip to content

Update Dataproc samples. #2158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 20, 2019
38 changes: 16 additions & 22 deletions dataproc/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Cloud Dataproc API Example
# Cloud Dataproc API Examples

[![Open in Cloud Shell][shell_img]][shell_link]

Expand All @@ -7,21 +7,20 @@

Sample command-line programs for interacting with the Cloud Dataproc API.


Please see [the tutorial on the using the Dataproc API with the Python client
See [the tutorial on the using the Dataproc API with the Python client
library](https://cloud.google.com/dataproc/docs/tutorials/python-library-example)
for more information.
for information on a walkthrough you can run to try out the Cloud Dataproc API sample code.

Note that while this sample demonstrates interacting with Dataproc via the API, the functionality
demonstrated here could also be accomplished using the Cloud Console or the gcloud CLI.
Note that while this sample demonstrates interacting with Dataproc via the API, the functionality demonstrated here could also be accomplished using the Cloud Console or the gcloud CLI.

`list_clusters.py` is a simple command-line program to demonstrate connecting to the
Dataproc API and listing the clusters in a region
`list_clusters.py` is a simple command-line program to demonstrate connecting to the Cloud Dataproc API and listing the clusters in a region.

`create_cluster_and_submit_job.py` demonstrates how to create a cluster, submit the
`submit_job_to_cluster.py` demonstrates how to create a cluster, submit the
`pyspark_sort.py` job, download the output from Google Cloud Storage, and output the result.

`pyspark_sort.py_gcs` is the asme as `pyspark_sort.py` but demonstrates
`single_job_workflow.py` uses the Cloud Dataproc InstantiateInlineWorkflowTemplate API to create an ephemeral cluster, run a job, then delete the cluster with one API request.

`pyspark_sort.py_gcs` is the same as `pyspark_sort.py` but demonstrates
reading from a GCS bucket.

## Prerequisites to run locally:
Expand Down Expand Up @@ -59,32 +58,27 @@ To run list_clusters.py:

python list_clusters.py $GOOGLE_CLOUD_PROJECT --region=$REGION

`submit_job_to_cluster.py` can create the Dataproc cluster, or use an existing one.
If you'd like to create a cluster ahead of time, either use the
[Cloud Console](console.cloud.google.com) or run:
`submit_job_to_cluster.py` can create the Dataproc cluster or use an existing cluster. To create a cluster before running the code, you can use the [Cloud Console](console.cloud.google.com) or run:

gcloud dataproc clusters create your-cluster-name

To run submit_job_to_cluster.py, first create a GCS bucket for Dataproc to stage files, from the Cloud Console or with
gsutil:
To run submit_job_to_cluster.py, first create a GCS bucket (used by Cloud Dataproc to stage files) from the Cloud Console or with gsutil:

gsutil mb gs://<your-staging-bucket-name>

Set the environment variable's name:
Next, set the following environment variables:

BUCKET=your-staging-bucket
CLUSTER=your-cluster-name

Then, if you want to rely on an existing cluster, run:
Then, if you want to use an existing cluster, run:

python submit_job_to_cluster.py --project_id=$GOOGLE_CLOUD_PROJECT --zone=us-central1-b --cluster_name=$CLUSTER --gcs_bucket=$BUCKET

Otherwise, if you want the script to create a new cluster for you:
Alternatively, to create a new cluster, which will be deleted at the end of the job, run:

python submit_job_to_cluster.py --project_id=$GOOGLE_CLOUD_PROJECT --zone=us-central1-b --cluster_name=$CLUSTER --gcs_bucket=$BUCKET --create_new_cluster

This will setup a cluster, upload the PySpark file, submit the job, print the result, then
delete the cluster.
The script will setup a cluster, upload the PySpark file, submit the job, print the result, then, if it created the cluster, delete the cluster.

You can optionally specify a `--pyspark_file` argument to change from the default
`pyspark_sort.py` included in this script to a new script.
Optionally, you can add the `--pyspark_file` argument to change from the default `pyspark_sort.py` included in this script to a new script.
54 changes: 29 additions & 25 deletions dataproc/list_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,49 +10,53 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Sample command-line program to list Cloud Dataproc clusters in a region.

""" Sample command-line program for listing Google Dataproc Clusters
"""
Example usage:
python list_clusters.py --project_id=my-project-id --region=global

"""
import argparse

import googleapiclient.discovery
from google.cloud import dataproc_v1
from google.cloud.dataproc_v1.gapic.transports import (
cluster_controller_grpc_transport)


# [START dataproc_list_clusters]
def list_clusters(dataproc, project, region):
result = dataproc.projects().regions().clusters().list(
projectId=project,
region=region).execute()
return result
"""List the details of clusters in the region."""
for cluster in dataproc.list_clusters(project, region):
print(('{} - {}'.format(cluster.cluster_name,
cluster.status.State.Name(
cluster.status.state))))
# [END dataproc_list_clusters]


# [START dataproc_get_client]
def get_client():
"""Builds a client to the dataproc API."""
dataproc = googleapiclient.discovery.build('dataproc', 'v1')
return dataproc
# [END dataproc_get_client]
def main(project_id, region):

if region == 'global':
# Use the default gRPC global endpoints.
dataproc_cluster_client = dataproc_v1.ClusterControllerClient()
else:
# Use a regional gRPC endpoint. See:
# https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
client_transport = (
cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
address='{}-dataproc.googleapis.com:443'.format(region)))
dataproc_cluster_client = dataproc_v1.ClusterControllerClient(
client_transport)

def main(project_id, region):
dataproc = get_client()
result = list_clusters(dataproc, project_id, region)
print(result)
list_clusters(dataproc_cluster_client, project_id, region)


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
description=__doc__, formatter_class=(
argparse.RawDescriptionHelpFormatter))
parser.add_argument(
'project_id', help='Project ID you want to access.'),
# Sets the region to "global" if it's not provided
# Note: sub-regions (e.g.: us-central1-a/b) are currently not supported
'--project_id', help='Project ID to access.', required=True)
parser.add_argument(
'--region', default='global', help='Region to list clusters')
'--region', help='Region of clusters to list.', required=True)

args = parser.parse_args()
main(args.project_id, args.region)
6 changes: 3 additions & 3 deletions dataproc/python-api-walkthrough.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Job output in Cloud Shell shows cluster creation, job submission,
...
Creating cluster...
Cluster created.
Uploading pyspark file to GCS
Uploading pyspark file to Cloud Storage
new-cluster-name - RUNNING
Submitted job ID ...
Waiting for job to finish...
Expand All @@ -140,12 +140,12 @@ Job output in Cloud Shell shows cluster creation, job submission,
### Next Steps:

* **View job details from the Console.** View job details by selecting the
PySpark job from the Cloud Dataproc
PySpark job from the Cloud Dataproc
[Jobs page](https://console.cloud.google.com/dataproc/jobs)
in the Google Cloud Platform Console.

* **Delete resources used in the walkthrough.**
The `submit_job.py` job deletes the cluster that it created for this
The `submit_job_to_cluster.py` job deletes the cluster that it created for this
walkthrough.

If you created a bucket to use for this walkthrough,
Expand Down
3 changes: 2 additions & 1 deletion dataproc/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
google-api-python-client==1.7.8
grpcio>=1.2.0
google-auth==1.6.2
google-auth-httplib2==0.0.3
google-cloud==0.34.0
google-cloud-storage==1.13.2
google-cloud-dataproc==0.3.1
Loading