Skip to content

Commit 933900b

Browse files
committed
Adding HadoopFS sample
1 parent 52c592f commit 933900b

File tree

2 files changed

+242
-0
lines changed

2 files changed

+242
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2019 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// [START dataproc_quickstart]
18+
/* This quickstart sample walks a user through creating a Cloud Dataproc
19+
* cluster, submitting a PySpark job from Google Cloud Storage to the
20+
* cluster, reading the output of the job and deleting the cluster, all
21+
* using the Java client library.
22+
*
23+
* Usage:
24+
* mvn clean package -DskipTests
25+
*
26+
* mvn exec:java -Dexec.args="<PROJECT_ID> <REGION> <CLUSTER_NAME> <GCS_JOB_FILE_PATH>"
27+
*
28+
* You can also set these arguments in the main function instead of providing them via the CLI.
29+
*/
30+
31+
import com.google.api.gax.longrunning.OperationFuture;
32+
import com.google.cloud.dataproc.v1.*;
33+
import com.google.cloud.storage.Blob;
34+
import com.google.cloud.storage.Storage;
35+
import com.google.cloud.storage.StorageOptions;
36+
import com.google.protobuf.Empty;
37+
import java.io.IOException;
38+
import java.util.concurrent.CompletableFuture;
39+
import java.util.concurrent.ExecutionException;
40+
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.TimeoutException;
42+
43+
public class SubmitHadoopFSJob {
44+
45+
public static Job waitForJobCompletion(
46+
JobControllerClient jobControllerClient, String projectId, String region, String jobId) {
47+
while (true) {
48+
// Poll the service periodically until the Job is in a finished state.
49+
Job jobInfo = jobControllerClient.getJob(projectId, region, jobId);
50+
switch (jobInfo.getStatus().getState()) {
51+
case DONE:
52+
case CANCELLED:
53+
case ERROR:
54+
return jobInfo;
55+
default:
56+
try {
57+
// Wait a second in between polling attempts.
58+
TimeUnit.SECONDS.sleep(1);
59+
} catch (InterruptedException e) {
60+
throw new RuntimeException(e);
61+
}
62+
}
63+
}
64+
}
65+
66+
public static void submitHadoopFSJob(
67+
String projectId, String region, String clusterName, String hadoopFSQuery)
68+
throws IOException, InterruptedException {
69+
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
70+
71+
// Configure the settings for the job controller client.
72+
JobControllerSettings jobControllerSettings =
73+
JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
74+
75+
// Create both a cluster controller client and job controller client with the
76+
// configured settings. The client only needs to be created once and can be reused for
77+
// multiple requests. Using a try-with-resources closes the client, but this can also be done
78+
// manually with the .close() method.
79+
try (JobControllerClient jobControllerClient =
80+
JobControllerClient.create(jobControllerSettings)) {
81+
82+
// Configure cluster placement for the job.
83+
JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
84+
85+
// Configure Hadoop job settings. The HadoopFS query is set here.
86+
HadoopJob hadoopJob = HadoopJob.newBuilder()
87+
.setMainClass("org.apache.hadoop.fs.FsShell")
88+
.addArgs(hadoopFSQuery).build();
89+
90+
Job job = Job.newBuilder().setPlacement(jobPlacement).setHadoopJob(hadoopJob).build();
91+
92+
// Submit an asynchronous request to execute the job.
93+
Job request = jobControllerClient.submitJob(projectId, region, job);
94+
String jobId = request.getReference().getJobId();
95+
System.out.println(String.format("Submitted job \"%s\"", jobId));
96+
97+
// Wait for the job to finish.
98+
CompletableFuture<Job> finishedJobFuture =
99+
CompletableFuture.supplyAsync(
100+
() -> waitForJobCompletion(jobControllerClient, projectId, region, jobId));
101+
int timeout = 10;
102+
try {
103+
Job jobInfo = finishedJobFuture.get(timeout, TimeUnit.MINUTES);
104+
System.out.println(String.format("Job %s finished successfully.", jobId));
105+
106+
// Cloud Dataproc job output gets saved to a GCS bucket allocated to it.
107+
Cluster clusterInfo = clusterControllerClient.getCluster(projectId, region, clusterName);
108+
Storage storage = StorageOptions.getDefaultInstance().getService();
109+
Blob blob =
110+
storage.get(
111+
clusterInfo.getConfig().getConfigBucket(),
112+
String.format(
113+
"google-cloud-dataproc-metainfo/%s/jobs/%s/driveroutput.000000000",
114+
clusterInfo.getClusterUuid(), jobId));
115+
System.out.println(
116+
String.format(
117+
"Job \"%s\" finished with state %s:%n%s",
118+
jobId, jobInfo.getStatus().getState(), new String(blob.getContent())));
119+
} catch (TimeoutException e) {
120+
System.err.println(
121+
String.format("Job timed out after %d minutes: %s", timeout, e.getMessage()));
122+
}
123+
124+
// Delete the cluster.
125+
OperationFuture<Empty, ClusterOperationMetadata> deleteClusterAsyncRequest =
126+
clusterControllerClient.deleteClusterAsync(projectId, region, clusterName);
127+
deleteClusterAsyncRequest.get();
128+
System.out.println(String.format("Cluster \"%s\" successfully deleted.", clusterName));
129+
130+
} catch (ExecutionException e) {
131+
System.err.println(String.format("Error executing quickstart: %s ", e.getMessage()));
132+
}
133+
}
134+
135+
public static void main(String... args) throws IOException, InterruptedException {
136+
if (args.length != 4) {
137+
System.err.println(
138+
"Insufficient number of parameters provided. Please make sure a "
139+
+ "PROJECT_ID, REGION, CLUSTER_NAME and JOB_FILE_PATH are provided, in this order.");
140+
return;
141+
}
142+
143+
String projectId = args[0]; // project-id of project to create the cluster in
144+
String region = args[1]; // region to create the cluster
145+
String clusterName = args[2]; // name of the cluster
146+
String jobFilePath = args[3]; // location in GCS of the PySpark job
147+
148+
quickstart(projectId, region, clusterName, jobFilePath);
149+
}
150+
}
151+
// [END dataproc_quickstart]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2019 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import com.google.api.gax.longrunning.OperationFuture;
18+
import com.google.cloud.dataproc.v1.ClusterControllerClient;
19+
import com.google.cloud.dataproc.v1.ClusterControllerSettings;
20+
import com.google.cloud.dataproc.v1.ClusterOperationMetadata;
21+
import com.google.protobuf.Empty;
22+
import org.hamcrest.CoreMatchers;
23+
import org.junit.After;
24+
import org.junit.Before;
25+
import org.junit.BeforeClass;
26+
import org.junit.Test;
27+
import org.junit.runner.RunWith;
28+
import org.junit.runners.JUnit4;
29+
30+
import java.io.ByteArrayOutputStream;
31+
import java.io.IOException;
32+
import java.io.PrintStream;
33+
import java.util.UUID;
34+
import java.util.concurrent.ExecutionException;
35+
36+
import static junit.framework.TestCase.assertNotNull;
37+
import static org.hamcrest.MatcherAssert.assertThat;
38+
39+
@RunWith(JUnit4.class)
40+
public class SubmitHadoopFSJobTest {
41+
42+
private static final String CLUSTER_NAME =
43+
String.format("java-cc-test--%s", UUID.randomUUID().toString());
44+
private static final String REGION = "us-east1";
45+
private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
46+
47+
private ByteArrayOutputStream bout;
48+
49+
private static void requireEnv(String varName) {
50+
assertNotNull(
51+
String.format("Environment variable '%s' is required to perform these tests.", varName),
52+
System.getenv(varName));
53+
}
54+
55+
@BeforeClass
56+
public static void checkRequirements() {
57+
requireEnv("GOOGLE_APPLICATION_CREDENTIALS");
58+
requireEnv("GOOGLE_CLOUD_PROJECT");
59+
}
60+
61+
@Before
62+
public void setUp() {
63+
bout = new ByteArrayOutputStream();
64+
System.setOut(new PrintStream(bout));
65+
//System.setErr(new PrintStream(bout));
66+
}
67+
68+
@Test
69+
public void createClusterTest() throws IOException, InterruptedException {
70+
CreateCluster.createCluster(PROJECT_ID, REGION, CLUSTER_NAME);
71+
String output = bout.toString();
72+
73+
System.out.println(output);
74+
assertThat(output, CoreMatchers.containsString(CLUSTER_NAME));
75+
}
76+
77+
@After
78+
public void tearDown() throws IOException, InterruptedException, ExecutionException {
79+
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", REGION);
80+
81+
ClusterControllerSettings clusterControllerSettings =
82+
ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
83+
84+
try (ClusterControllerClient clusterControllerClient =
85+
ClusterControllerClient.create(clusterControllerSettings)) {
86+
OperationFuture<Empty, ClusterOperationMetadata> deleteClusterAsyncRequest =
87+
clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME);
88+
deleteClusterAsyncRequest.get();
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)