Skip to content

Add Es Spark Accumulators #2159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/src/reference/asciidoc/core/configuration.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,13 @@ datasets. Jobs executed against a red cluster will yield inconsistent results wh
compared to jobs executed against a fully green or yellow cluster. Use this setting with
caution.

[float]
==== Metrics

added[8.12]
`es.metrics.prefix` (default empty)::
Only useful for the Spark metrics. When given the set of reported metrics will start by this prefix with a dot separator.

[float]
==== Input

Expand Down
15 changes: 10 additions & 5 deletions docs/src/reference/asciidoc/core/metrics.adoc
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
[[metrics]]
== Hadoop Metrics

The Hadoop system records a set of metric counters for each job that it runs. {eh} extends on that and provides metrics about its activity for each job run by leveraging the Hadoop http://hadoop.apache.org/docs/r{hadoop-docs-v}/api/org/apache/hadoop/mapred/Counters.html[Counters] infrastructure. During each run, {eh} sends statistics from each task instance, as it is running, which get aggregated by the {mr} infrastructure and are available through the standard Hadoop APIs.
The Hadoop Map Reduce system records a set of metric counters for each job that it runs. {eh} extends on that and provides metrics about its activity for each job run by leveraging the Hadoop http://hadoop.apache.org/docs/r{hadoop-docs-v}/api/org/apache/hadoop/mapred/Counters.html[Counters] infrastructure. During each run, {eh} sends statistics from each task instance, as it is running, which get aggregated by the {mr} infrastructure and are available through the standard Hadoop APIs.

{eh} provides the following counters, available under `org.elasticsearch.hadoop.mr.Counter` enum:
For Spark, the same principle exists with https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators[Accumulators].

.Available counters
{eh} provides the following counters, available under `org.elasticsearch.hadoop.mr.Counter` enum or the following accumulators available under
`org.elasticsearch.spark.acc.EsSparkAccumulators` object in function of the framework that you use:

.Available counters and accumulators
[cols="^,^",options="header"]

|===
| Counter name | Purpose
| Name | Purpose

2+h| Data focused

Expand Down Expand Up @@ -42,7 +45,7 @@ The Hadoop system records a set of metric counters for each job that it runs. {e

|===

One can use the counters programatically, depending on the API used, through http://hadoop.apache.org/docs/r{hadoop-docs-v}/api/index.html?org/apache/hadoop/mapred/Counters.html[mapred] or http://hadoop.apache.org/docs/r{hadoop-docs-v}/api/index.html?org/apache/hadoop/mapreduce/Counter.html[mapreduce]. Whatever the choice, {eh} performs automatic reports without any user intervention. In fact, when using {eh} one will see the stats reported at the end of the job run, for example:
One can use the counters programmatically, depending on the API used, through http://hadoop.apache.org/docs/r{hadoop-docs-v}/api/index.html?org/apache/hadoop/mapred/Counters.html[mapred] or http://hadoop.apache.org/docs/r{hadoop-docs-v}/api/index.html?org/apache/hadoop/mapreduce/Counter.html[mapreduce]. Whatever the choice, {eh} performs automatic reports without any user intervention. In fact, when using {eh} one will see the stats reported at the end of the job run, for example:

[source, bash]
----
Expand All @@ -69,3 +72,5 @@ Elasticsearch Hadoop Counters
Scroll Total Time(ms)=0

----

Same thing for Spark.
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,8 @@ public interface ConfigurationOptions {
/** Security options **/
String ES_SECURITY_AUTHENTICATION = "es.security.authentication";
String ES_SECURITY_USER_PROVIDER_CLASS = "es.security.user.provider.class";

/** Metrics **/
String ES_METRICS_PREFIX = "es.metrics.prefix";
String ES_METRICS_PREFIX_DEFAULT = "";
}
4 changes: 4 additions & 0 deletions mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,10 @@ public String getSecurityUserProviderClass() {
return getProperty(ConfigurationOptions.ES_SECURITY_USER_PROVIDER_CLASS);
}

public String getMetricsPrefix() {
return getProperty(ES_METRICS_PREFIX, ES_METRICS_PREFIX_DEFAULT);
}

public abstract InputStream loadResource(String location);

public abstract Settings copy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import org.elasticsearch.hadoop.util.EsMajorVersion
import org.elasticsearch.hadoop.util.StringUtils
import org.elasticsearch.hadoop.util.TestSettings
import org.elasticsearch.hadoop.util.TestUtils
import org.elasticsearch.spark.acc.EsSparkAccumulators
import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.rdd.Metadata.ID
import org.elasticsearch.spark.rdd.Metadata.TTL
Expand Down Expand Up @@ -908,6 +909,46 @@ class AbstractScalaEsScalaSpark(prefix: String, readMetadata: jl.Boolean) extend
}
}

@Test
def testSparkAccumulators() = {
def config(metricsPrefix: String): Map[String, String] = {
val index = wrapIndex(s"spark-test-accumulators-${metricsPrefix}")
val typename = "data"
val target = resource(index, typename, version)
Map[String, String](
ES_RESOURCE_READ -> target,
ES_RESOURCE_WRITE -> target,
ES_METRICS_PREFIX -> metricsPrefix
)
}

val data = Seq(
Map("field1" -> "doc1"),
Map("field1" -> "doc2"),
Map("field1" -> "doc3")
)
val inputRdd = sc.makeRDD(data)

inputRdd.saveToEs(config("single-write"))

val doubleWritesConfig = config("double-write")
inputRdd.saveToEs(doubleWritesConfig)
inputRdd.saveToEs(doubleWritesConfig)

val readWriteConfig = config("read-write")
inputRdd.saveToEs(readWriteConfig)
val outputRdd = sc.esRDD(readWriteConfig)

import org.elasticsearch.hadoop.mr.Counter._
assertEquals(inputRdd.count(), EsSparkAccumulators.get("single-write", DOCS_SENT))
assertEquals(inputRdd.count() * 2, EsSparkAccumulators.get("double-write", DOCS_SENT))
assertEquals(inputRdd.count(), EsSparkAccumulators.get("read-write", DOCS_SENT))
assertEquals(0, EsSparkAccumulators.get("read-write", DOCS_RECEIVED)) // lazy reads before the action count
assertEquals(outputRdd.count(), EsSparkAccumulators.get("read-write", DOCS_RECEIVED))
assertEquals(outputRdd.count() * 2, EsSparkAccumulators.get("read-write", DOCS_RECEIVED))
EsSparkAccumulators.clear()
}

def wrapIndex(index: String) = {
prefix + index
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.elasticsearch.spark.acc

import org.apache.spark.SparkContext
import org.apache.spark.util.LongAccumulator
import org.elasticsearch.hadoop.mr.Counter
import org.elasticsearch.hadoop.rest.stats.Stats

import scala.collection.JavaConverters.asScalaSetConverter
import scala.collection.mutable

object EsSparkAccumulators {

def getAll: Set[LongAccumulator] = accumulators.values.toSet

private val accumulators: mutable.Map[String, LongAccumulator] = mutable.Map[String, LongAccumulator]()

private val counters = Counter.ALL.asScala

private[spark] def build(sc: SparkContext, prefix: String): Unit = {
for (counter <- counters) {
val fullName = getAccumulatorFullName(prefix, counter.name())
if(!accumulators.contains(fullName)) {
accumulators += (fullName -> sc.longAccumulator(fullName))
}
}
}

private[spark] def add(stats: Stats, prefix: String): Unit = {
if (accumulators.nonEmpty) {
for (counter <- counters) {
accumulators.get(getAccumulatorFullName(prefix, counter.name())).foreach(_.add(counter.get(stats)))
}
}
}

private[spark] def get(prefix: String, counter: Counter): Long = {
accumulators(getAccumulatorFullName(prefix, counter.name())).value
}

private[spark] def clear(): Unit = {
accumulators.clear()
}

private def getAccumulatorFullName(prefix: String, name: String): String = {
val prefixWithSep = if (prefix.isEmpty) "" else s"${prefix}."
s"${prefixWithSep}${name}"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.spark.rdd;

import JDKCollectionConvertersCompat.Converters._

import scala.reflect.ClassTag
import org.apache.commons.logging.LogFactory
import org.apache.spark.Partition
Expand All @@ -31,6 +32,7 @@ import org.elasticsearch.hadoop.rest.PartitionDefinition
import org.elasticsearch.hadoop.util.ObjectUtils
import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.elasticsearch.hadoop.rest.RestRepository
import org.elasticsearch.spark.acc.EsSparkAccumulators

import scala.annotation.meta.param

Expand Down Expand Up @@ -67,6 +69,8 @@ private[spark] abstract class AbstractEsRDD[T: ClassTag](
}
}

EsSparkAccumulators.build(sc, esCfg.getMetricsPrefix)

@transient private[spark] lazy val esCfg = {
val cfg = new SparkSettingsManager().load(sc.getConf).copy();
cfg.merge(params.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.TaskKilledException
import org.elasticsearch.hadoop.cfg.Settings
import org.elasticsearch.hadoop.rest.RestService
import org.elasticsearch.hadoop.rest.PartitionDefinition
import org.elasticsearch.spark.acc.EsSparkAccumulators

import java.util.Locale

Expand All @@ -41,9 +42,10 @@ private[spark] abstract class AbstractEsRDDIterator[T](

private var initialized = false;

lazy val reader = {
private val settings = partition.settings()

private val reader = {
initialized = true
val settings = partition.settings()

// initialize mapping/ scroll reader
initReader(settings, log)
Expand All @@ -56,7 +58,10 @@ private[spark] abstract class AbstractEsRDDIterator[T](
}

// Register an on-task-completion callback to close the input stream.
CompatUtils.addOnCompletition(context, () => closeIfNeeded())
CompatUtils.addOnCompletition(context, () => {
EsSparkAccumulators.add(reader.stats(), settings.getMetricsPrefix)
closeIfNeeded()
})

def hasNext: Boolean = {
if (CompatUtils.isInterrupted(context)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.elasticsearch.hadoop.serialization.field.FieldExtractor
import org.elasticsearch.hadoop.serialization.bulk.MetadataExtractor
import org.elasticsearch.hadoop.serialization.bulk.PerEntityPoolingMetadataExtractor
import org.elasticsearch.hadoop.util.ObjectUtils
import org.elasticsearch.spark.acc.EsSparkAccumulators
import org.elasticsearch.spark.serialization.ScalaMapFieldExtractor
import org.elasticsearch.spark.serialization.ScalaMetadataExtractor
import org.elasticsearch.spark.serialization.ScalaValueWriter
Expand Down Expand Up @@ -71,7 +72,10 @@ private[spark] class EsRDDWriter[T: ClassTag](val serializedSettings: String,
val writer = RestService.createWriter(settings, taskContext.partitionId.toLong, -1, log)

val listener = new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = writer.close()
override def onTaskCompletion(context: TaskContext): Unit = {
EsSparkAccumulators.add(writer.repository.stats(), settings.getMetricsPrefix)
writer.close()
}
}
taskContext.addTaskCompletionListener(listener)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.elasticsearch.hadoop.cfg.PropertiesSettings
import org.elasticsearch.hadoop.mr.security.HadoopUserProvider
import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.elasticsearch.hadoop.rest.InitializationUtils
import org.elasticsearch.spark.acc.EsSparkAccumulators

object EsSpark {

Expand Down Expand Up @@ -94,10 +95,12 @@ object EsSpark {
if (rdd == null || rdd.partitions.length == 0) {
return
}

val sparkCfg = new SparkSettingsManager().load(rdd.sparkContext.getConf)

val sc = rdd.sparkContext
val sparkCfg = new SparkSettingsManager().load(sc.getConf)
val config = new PropertiesSettings().load(sparkCfg.save())
config.merge(cfg.asJava)
EsSparkAccumulators.build(sc, config.getMetricsPrefix)

// Need to discover the EsVersion here before checking if the index exists
InitializationUtils.setUserProviderIfNotSet(config, classOf[HadoopUserProvider], LOG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkException
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql.types.ArrayType
import org.apache.spark.sql.types.Decimal
import org.apache.spark.sql.types.DecimalType
Expand Down Expand Up @@ -87,6 +85,7 @@ import org.junit.runners.Parameterized.Parameters
import com.esotericsoftware.kryo.io.{Input => KryoInput}
import com.esotericsoftware.kryo.io.{Output => KryoOutput}
import org.apache.spark.rdd.RDD
import org.elasticsearch.spark.acc.EsSparkAccumulators

import javax.xml.bind.DatatypeConverter
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -2541,6 +2540,49 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
assertThat(head.getString(0), containsString("Chipotle"))
}

@Test
def testSparkAccumulators() = {
def config(metricsPrefix: String): Map[String, String] = {
val index = wrapIndex(s"spark-test-accumulators-${metricsPrefix}")
val typename = "data"
val target = resource(index, typename, version)
Map[String, String](
ES_RESOURCE_READ -> target,
ES_RESOURCE_WRITE -> target,
ES_METRICS_PREFIX -> metricsPrefix
)
}

val data = Seq(
Row("doc1"),
Row("doc2"),
Row("doc3")
)
val inputRdd = sc.makeRDD(data)
val schema = new StructType()
.add("field1", StringType, nullable = false)
val df = sqc.createDataFrame(inputRdd, schema)

df.saveToEs(config("single-write"))

val doubleWritesConfig = config("double-write")
df.saveToEs(doubleWritesConfig)
df.saveToEs(doubleWritesConfig)

val readWriteConfig = config("read-write")
df.saveToEs(readWriteConfig)
val outputDf = sqc.esDF(readWriteConfig)

import org.elasticsearch.hadoop.mr.Counter._
assertEquals(inputRdd.count(), EsSparkAccumulators.get("single-write", DOCS_SENT))
assertEquals(inputRdd.count() * 2, EsSparkAccumulators.get("double-write", DOCS_SENT))
assertEquals(inputRdd.count(), EsSparkAccumulators.get("read-write", DOCS_SENT))
assertEquals(0, EsSparkAccumulators.get("read-write", DOCS_RECEIVED)) // lazy reads before the action count
assertEquals(outputDf.count(), EsSparkAccumulators.get("read-write", DOCS_RECEIVED))
assertEquals(outputDf.count() * 2, EsSparkAccumulators.get("read-write", DOCS_RECEIVED))
EsSparkAccumulators.clear()
}

/**
* Take advantage of the fixed method order and clear out all created indices.
* The indices will last in Elasticsearch for all parameters of this test suite.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.elasticsearch.hadoop.cfg.PropertiesSettings
import org.elasticsearch.hadoop.mr.security.HadoopUserProvider
import org.elasticsearch.hadoop.rest.InitializationUtils
import org.elasticsearch.hadoop.util.ObjectUtils
import org.elasticsearch.spark.acc.EsSparkAccumulators
import org.elasticsearch.spark.cfg.SparkSettingsManager

import scala.collection.JavaConverters.mapAsJavaMapConverter
Expand Down Expand Up @@ -93,6 +94,7 @@ object EsSparkSQL {
val sparkCfg = new SparkSettingsManager().load(sparkCtx.getConf)
val esCfg = new PropertiesSettings().load(sparkCfg.save())
esCfg.merge(cfg.asJava)
EsSparkAccumulators.build(sparkCtx, esCfg.getMetricsPrefix)

// Need to discover ES Version before checking index existence
InitializationUtils.setUserProviderIfNotSet(esCfg, classOf[HadoopUserProvider], LOG)
Expand Down
Loading