Skip to content

Commit 02038c3

Browse files
emkornfieldtswast
authored andcommitted
[BQ Storage API] Add Apache Arrow Sample (#1648)
* [BQ Storage API] Add Apache Arrow Sample - Adds a sample arrow client based off of avro client. - removes unnecessary close in Avro Storage Sample * fix comment. arrow->avro * put filter in for arrow * update region tag
1 parent 6b4c9ef commit 02038c3

File tree

4 files changed

+255
-1
lines changed

4 files changed

+255
-1
lines changed

bigquery/bigquerystorage/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@
4848
<version>1.9.1</version>
4949
</dependency>
5050

51+
<dependency>
52+
<groupId>org.apache.arrow</groupId>
53+
<artifactId>arrow-vector</artifactId>
54+
<version>0.15.0</version>
55+
</dependency>
56+
5157
<!-- Test dependencies -->
5258
<dependency>
5359
<groupId>junit</groupId>
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Copyright 2019 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.bigquerystorage;
18+
19+
// [START bigquerystorage_arrow_quickstart]
20+
21+
import com.google.api.gax.rpc.ServerStream;
22+
import com.google.cloud.bigquery.storage.v1beta1.ArrowProto.ArrowRecordBatch;
23+
import com.google.cloud.bigquery.storage.v1beta1.ArrowProto.ArrowSchema;
24+
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
25+
import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions;
26+
import com.google.cloud.bigquery.storage.v1beta1.Storage;
27+
import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
28+
import com.google.cloud.bigquery.storage.v1beta1.Storage.DataFormat;
29+
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest;
30+
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;
31+
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
32+
import com.google.cloud.bigquery.storage.v1beta1.Storage.StreamPosition;
33+
import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto.TableModifiers;
34+
import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto.TableReference;
35+
import com.google.common.base.Preconditions;
36+
import com.google.protobuf.Timestamp;
37+
import java.io.IOException;
38+
import java.util.ArrayList;
39+
import java.util.List;
40+
import org.apache.arrow.memory.BufferAllocator;
41+
import org.apache.arrow.memory.RootAllocator;
42+
import org.apache.arrow.vector.FieldVector;
43+
import org.apache.arrow.vector.VectorLoader;
44+
import org.apache.arrow.vector.VectorSchemaRoot;
45+
import org.apache.arrow.vector.ipc.ReadChannel;
46+
import org.apache.arrow.vector.ipc.message.MessageSerializer;
47+
import org.apache.arrow.vector.types.pojo.Field;
48+
import org.apache.arrow.vector.types.pojo.Schema;
49+
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
50+
51+
public class StorageArrowSample {
52+
53+
/*
54+
* SimpleRowReader handles deserialization of the Apache Arrow-encoded row batches transmitted
55+
* from the storage API using a generic datum decoder.
56+
*/
57+
private static class SimpleRowReader implements AutoCloseable {
58+
59+
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
60+
61+
// Decoder object will be reused to avoid re-allocation and too much garbage collection.
62+
private final VectorSchemaRoot root;
63+
private final VectorLoader loader;
64+
65+
66+
public SimpleRowReader(ArrowSchema arrowSchema) throws IOException {
67+
Schema schema = MessageSerializer.deserializeSchema(new ReadChannel(
68+
new ByteArrayReadableSeekableByteChannel(
69+
arrowSchema.getSerializedSchema().toByteArray())));
70+
Preconditions.checkNotNull(schema);
71+
List<FieldVector> vectors = new ArrayList<>();
72+
for (Field field : schema.getFields()) {
73+
vectors.add(field.createVector(allocator));
74+
}
75+
root = new VectorSchemaRoot(vectors);
76+
loader = new VectorLoader(root);
77+
}
78+
79+
/**
80+
* Sample method for processing Arrow data which only validates decoding.
81+
*
82+
* @param batch object returned from the ReadRowsResponse.
83+
*/
84+
public void processRows(ArrowRecordBatch batch) throws IOException {
85+
org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch = MessageSerializer
86+
.deserializeRecordBatch(
87+
new ReadChannel(new ByteArrayReadableSeekableByteChannel(
88+
batch.getSerializedRecordBatch().toByteArray())),
89+
allocator);
90+
91+
loader.load(deserializedBatch);
92+
// Release buffers from batch (they are still held in the vectors in root).
93+
deserializedBatch.close();
94+
System.out.println(root.contentToTSVString());
95+
// Release buffers from vectors in root.
96+
root.clear();
97+
98+
}
99+
100+
@Override
101+
public void close() {
102+
root.close();
103+
allocator.close();
104+
}
105+
}
106+
107+
public static void main(String... args) throws Exception {
108+
// Sets your Google Cloud Platform project ID.
109+
// String projectId = "YOUR_PROJECT_ID";
110+
String projectId = args[0];
111+
Integer snapshotMillis = null;
112+
if (args.length > 1) {
113+
snapshotMillis = Integer.parseInt(args[1]);
114+
}
115+
116+
try (BigQueryStorageClient client = BigQueryStorageClient.create()) {
117+
String parent = String.format("projects/%s", projectId);
118+
119+
// This example uses baby name data from the public datasets.
120+
TableReference tableReference =
121+
TableReference.newBuilder()
122+
.setProjectId("bigquery-public-data")
123+
.setDatasetId("usa_names")
124+
.setTableId("usa_1910_current")
125+
.build();
126+
127+
// We specify the columns to be projected by adding them to the selected fields,
128+
// and set a simple filter to restrict which rows are transmitted.
129+
TableReadOptions options =
130+
TableReadOptions.newBuilder()
131+
.addSelectedFields("name")
132+
.addSelectedFields("number")
133+
.addSelectedFields("state")
134+
.setRowRestriction("state = \"WA\"")
135+
.build();
136+
137+
// Begin building the session request.
138+
CreateReadSessionRequest.Builder builder =
139+
CreateReadSessionRequest.newBuilder()
140+
.setParent(parent)
141+
.setTableReference(tableReference)
142+
.setReadOptions(options)
143+
// This API can also deliver data serialized in Apache Avro format.
144+
// This example leverages Apache Arrow.
145+
.setFormat(DataFormat.ARROW)
146+
// We use a LIQUID strategy in this example because we only
147+
// read from a single stream. Consider BALANCED if you're consuming
148+
// multiple streams concurrently and want more consistent stream sizes.
149+
.setShardingStrategy(Storage.ShardingStrategy.LIQUID)
150+
.setRequestedStreams(1);
151+
152+
// Optionally specify the snapshot time. When unspecified, snapshot time is "now".
153+
if (snapshotMillis != null) {
154+
Timestamp t =
155+
Timestamp.newBuilder()
156+
.setSeconds(snapshotMillis / 1000)
157+
.setNanos((int) ((snapshotMillis % 1000) * 1000000))
158+
.build();
159+
TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build();
160+
builder.setTableModifiers(modifiers);
161+
}
162+
163+
ReadSession session = client.createReadSession(builder.build());
164+
// Setup a simple reader and start a read session.
165+
try (SimpleRowReader reader = new SimpleRowReader(session.getArrowSchema())) {
166+
167+
// Assert that there are streams available in the session. An empty table may not have
168+
// data available. If no sessions are available for an anonymous (cached) table, consider
169+
// writing results of a query to a named table rather than consuming cached results
170+
// directly.
171+
Preconditions.checkState(session.getStreamsCount() > 0);
172+
173+
// Use the first stream to perform reading.
174+
StreamPosition readPosition =
175+
StreamPosition.newBuilder().setStream(session.getStreams(0)).build();
176+
177+
ReadRowsRequest readRowsRequest =
178+
ReadRowsRequest.newBuilder().setReadPosition(readPosition).build();
179+
180+
// Process each block of rows as they arrive and decode using our simple row reader.
181+
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
182+
for (ReadRowsResponse response : stream) {
183+
Preconditions.checkState(response.hasArrowRecordBatch());
184+
reader.processRows(response.getArrowRecordBatch());
185+
}
186+
}
187+
}
188+
}
189+
}
190+
191+
// [END bigquerystorage_arrow_quickstart]

bigquery/bigquerystorage/src/main/java/com/example/bigquerystorage/StorageSample.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ public static void main(String... args) throws Exception {
160160
Preconditions.checkState(response.hasAvroRows());
161161
reader.processRows(response.getAvroRows());
162162
}
163-
client.close();
164163
}
165164
}
166165
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2019 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.bigquerystorage;
18+
19+
import static com.google.common.truth.Truth.assertThat;
20+
21+
import java.io.ByteArrayOutputStream;
22+
import java.io.PrintStream;
23+
import org.junit.After;
24+
import org.junit.Before;
25+
import org.junit.Test;
26+
import org.junit.runner.RunWith;
27+
import org.junit.runners.JUnit4;
28+
29+
/** Tests for quickstart sample. */
30+
@RunWith(JUnit4.class)
31+
@SuppressWarnings("checkstyle:abbreviationaswordinname")
32+
public class QuickstartArrowSampleIT {
33+
private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
34+
35+
private ByteArrayOutputStream bout;
36+
private PrintStream out;
37+
38+
@Before
39+
public void setUp() {
40+
bout = new ByteArrayOutputStream();
41+
out = new PrintStream(bout);
42+
System.setOut(out);
43+
}
44+
45+
@After
46+
public void tearDown() {
47+
System.setOut(null);
48+
}
49+
50+
@Test
51+
public void testQuickstart() throws Exception {
52+
StorageArrowSample.main(PROJECT_ID);
53+
String got = bout.toString();
54+
// Ensure at least 1k of output generated and a specific token was present in the output.
55+
assertThat(bout.size()).isGreaterThan(1024);
56+
assertThat(got).contains("Zayvion");
57+
}
58+
}

0 commit comments

Comments
 (0)