1
+ /*
2
+ * Copyright 2020 Google LLC
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ */
16
+
17
+ // [START dataproc_submit_hadoop_fs_job]
18
+ import com .google .api .gax .longrunning .OperationFuture ;
19
+ import com .google .cloud .dataproc .v1 .*;
20
+ import com .google .cloud .storage .Blob ;
21
+ import com .google .cloud .storage .Storage ;
22
+ import com .google .cloud .storage .StorageOptions ;
23
+
24
+ import java .io .IOException ;
25
+ import java .util .ArrayList ;
26
+ import java .util .Arrays ;
27
+ import java .util .concurrent .ExecutionException ;
28
+ import java .util .regex .Matcher ;
29
+ import java .util .regex .Pattern ;
30
+
31
+ public class SubmitHadoopFSJob {
32
+
33
+ public static ArrayList <String > stringToList (String s ) {
34
+ return new ArrayList <>(Arrays .asList (s .split (" " )));
35
+ }
36
+
37
+ public static void submitHadoopFSQuery () throws IOException , InterruptedException {
38
+ // TODO(developer): Replace these variables before running the sample.
39
+ String projectId = "your-project-id" ;
40
+ String region = "your-project-region" ;
41
+ String clusterName = "your-cluster-name" ;
42
+ String hadoopFSQuery = "your-hadoop-fs-query" ;
43
+ submitHadoopFSJob (projectId , region , clusterName , hadoopFSQuery );
44
+ }
45
+
46
+ public static void submitHadoopFSJob (
47
+ String projectId , String region , String clusterName , String hadoopFSQuery )
48
+ throws IOException , InterruptedException {
49
+ String myEndpoint = String .format ("%s-dataproc.googleapis.com:443" , region );
50
+
51
+ // Configure the settings for the job controller client.
52
+ JobControllerSettings jobControllerSettings =
53
+ JobControllerSettings .newBuilder ().setEndpoint (myEndpoint ).build ();
54
+
55
+ // Create a job controller client with the configured settings. Using a try-with-resources closes the client,
56
+ // but this can also be done manually with the .close() method.
57
+ try (JobControllerClient jobControllerClient =
58
+ JobControllerClient .create (jobControllerSettings )) {
59
+
60
+ // Configure cluster placement for the job.
61
+ JobPlacement jobPlacement = JobPlacement .newBuilder ().setClusterName (clusterName ).build ();
62
+
63
+ // Configure Hadoop job settings. The HadoopFS query is set here.
64
+ HadoopJob hadoopJob = HadoopJob .newBuilder ()
65
+ .setMainClass ("org.apache.hadoop.fs.FsShell" )
66
+ .addAllArgs (stringToList (hadoopFSQuery ))
67
+ .build ();
68
+
69
+ Job job = Job .newBuilder ().setPlacement (jobPlacement ).setHadoopJob (hadoopJob ).build ();
70
+
71
+ // Submit an asynchronous request to execute the job.
72
+ OperationFuture <Job , JobMetadata > submitJobAsOperationAsyncRequest =
73
+ jobControllerClient .submitJobAsOperationAsync (projectId , region , job );
74
+
75
+ Job response = submitJobAsOperationAsyncRequest .get ();
76
+
77
+ // Print output from Google Cloud Storage
78
+ Matcher matches = Pattern .compile ("gs://(.*?)/(.*)" ).matcher (response .getDriverOutputResourceUri ());
79
+ matches .matches ();
80
+
81
+ Storage storage = StorageOptions .getDefaultInstance ().getService ();
82
+ Blob blob = storage .get (matches .group (1 ), String .format ("%s.000000000" , matches .group (2 )));
83
+
84
+ System .out .println (String .format ("Job \" %s\" finished: %s" ,
85
+ response .getReference ().getJobId (),
86
+ new String (blob .getContent ())));
87
+
88
+ } catch (ExecutionException e ) {
89
+ System .err .println (String .format ("submitHadoopFSJob: %s " , e .getMessage ()));
90
+ }
91
+ }
92
+ }
93
+ // [END dataproc_submit_hadoop_fs_job]
0 commit comments