Skip to content

Commit 5a90d7b

Browse files
committed
Expand Readme with Spark SQL examples
1 parent d00148b commit 5a90d7b

File tree

1 file changed

+44
-4
lines changed

1 file changed

+44
-4
lines changed

README.md

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ Elasticsearch (__1.x__ or higher (2.x _highly_ recommended)) cluster accessible
99
Significant effort has been invested to create a small, dependency-free, self-contained jar that can be downloaded and put to use without any dependencies. Simply make it available to your job classpath and you're set.
1010
For a certain library, see the dedicated [chapter](http://www.elastic.co/guide/en/elasticsearch/hadoop/current/requirements.html).
1111

12-
ES-Hadoop 2.0.x and 2.1.x are compatible with Elasticsearch __1.X__ only
13-
1412
ES-Hadoop 2.2.x and higher are compatible with Elasticsearch __1.X__ and __2.X__
1513

14+
ES-Hadoop 2.0.x and 2.1.x are compatible with Elasticsearch __1.X__ *only*
15+
1616
## Installation
1717

1818
### Stable Release (currently `2.1.2`)
@@ -233,8 +233,21 @@ val conf = ...
233233
val sc = new SparkContext(conf)
234234
sc.esRDD("radio/artists", "?q=me*")
235235
```
236+
237+
#### Spark SQL
238+
```scala
239+
import org.elasticsearch.spark.sql._
240+
241+
// DataFrame schema automatically inferred
242+
val df = sqlContext.read.format("es").load("buckethead/albums")
243+
244+
// operations get pushed down and translated at runtime to Elasticsearch QueryDSL
245+
val playlist = df.filter(df("category").equalTo("pikes").and(df("year").geq(2016)))
246+
```
247+
236248
### Writing
237249
Import the `org.elasticsearch.spark._` package to gain `savetoEs` methods on your `RDD`s:
250+
238251
```scala
239252
import org.elasticsearch.spark._
240253

@@ -247,23 +260,40 @@ val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran")
247260
sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")
248261
```
249262

263+
#### Spark SQL
264+
265+
```scala
266+
import org.elasticsearch.spark.sql._
267+
268+
val df = sqlContext.read.json("examples/people.json")
269+
df.saveToES("spark/people")
270+
```
271+
250272
### Java
251273

252-
In a Java environment, use the `org.elasticsearch.spark.java.api` package, in particular the `JavaEsSpark` class.
274+
In a Java environment, use the `org.elasticsearch.spark.rdd.java.api` package, in particular the `JavaEsSpark` class.
253275

254276
### Reading
255277
To read data from ES, create a dedicated `RDD` and specify the query as an argument.
256278

257279
```java
258280
import org.apache.spark.api.java.JavaSparkContext;
259-
import org.elasticsearch.spark.java.api.JavaEsSpark;
281+
import org.elasticsearch.spark.rdd.java.api.JavaEsSpark;
260282

261283
SparkConf conf = ...
262284
JavaSparkContext jsc = new JavaSparkContext(conf);
263285

264286
JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "radio/artists");
265287
```
266288

289+
#### Spark SQL
290+
291+
```java
292+
SQLContext sql = new SQLContext(sc);
293+
DataFrame df = sql.read().format("es").load("buckethead/albums");
294+
DataFrame playlist = df.filter(df.col("category").equalTo("pikes").and(df.col("year").geq(2016)))
295+
```
296+
267297
### Writing
268298

269299
Use `JavaEsSpark` to index any `RDD` to Elasticsearch:
@@ -280,6 +310,16 @@ JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(doc1, doc2));
280310
JavaEsSpark.saveToEs(javaRDD, "spark/docs");
281311
```
282312

313+
#### Spark SQL
314+
315+
```java
316+
import org.elasticsearch.spark.sql.java.api.JavaEsSparkSQL;
317+
318+
DataFrame df = sqlContext.read.json("examples/people.json")
319+
JavaEsSparkSQL.saveToES(df, "spark/docs")
320+
```
321+
322+
283323
## [Cascading][]
284324
ES-Hadoop offers a dedicate Elasticsearch [Tap][], `EsTap` that can be used both as a sink or a source. Note that `EsTap` can be used in both local (`LocalFlowConnector`) and Hadoop (`HadoopFlowConnector`) flows:
285325

0 commit comments

Comments
 (0)