Skip to content

TransportPool validation failures are not handled correctly in Spark environment #1362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
nvoskresenskiy opened this issue Oct 4, 2019 · 6 comments

Comments

@nvoskresenskiy
Copy link

What kind an issue is this?

[x ] Bug report.

Issue description

When elasticsearch-hadoop is used with Spark structured streaming, method org.elasticsearch.hadoop.rest.pooling.TransportPool#validate rethrows a org.elasticsearch.hadoop.EsHadoopException instead of returning false when validation fails

Steps to reproduce

  1. Create a standard Spark structured streaming query with an Elasticsearch sink. Make sure es http transport pooling is enabled
  2. Set up an Elasticsearch cluster with two or more masters
  3. Run the query with continuous incoming test data
  4. Shut down the first master node

Expected:
Processing continues seamlessly and the transport pool connects to the next master node in the configuration
Actual:
Spark task fails with the stack trace below

Stack trace:


19/10/03 21:23:25,628 WARN TaskSetManager: Lost task 2.0 in stage 201.0 (TID 1273, ************, executor 1): org.elasticsearch.hadoop.EsHadoopException: Could not get a Transport from the Transport Pool for host [************:9200]
--
  | at org.elasticsearch.hadoop.rest.pooling.PooledHttpTransportFactory.borrowFrom(PooledHttpTransportFactory.java:110)
  | at org.elasticsearch.hadoop.rest.pooling.PooledHttpTransportFactory.create(PooledHttpTransportFactory.java:56)
  | at org.elasticsearch.hadoop.rest.NetworkClient.selectNextNode(NetworkClient.java:102)
  | at org.elasticsearch.hadoop.rest.NetworkClient.<init>(NetworkClient.java:85)
  | at org.elasticsearch.hadoop.rest.NetworkClient.<init>(NetworkClient.java:61)
  | at org.elasticsearch.hadoop.rest.RestClient.<init>(RestClient.java:99)
  | at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:93)
  | at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:582)
  | at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:65)
  | at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.run(EsStreamQueryWriter.scala:41)
  | at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:52)
  | at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:51)
  | at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  | at org.apache.spark.scheduler.Task.run(Task.scala:123)
  | at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
  | at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  | at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
  | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  | at java.lang.Thread.run(Thread.java:748)
  | Caused by: org.elasticsearch.hadoop.EsHadoopException: java.net.ConnectException: Connection refused (Connection refused)
  | at org.elasticsearch.hadoop.mr.security.HadoopUser.doAs(HadoopUser.java:68)
  | at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:656)
  | at org.elasticsearch.hadoop.rest.pooling.TransportPool.validate(TransportPool.java:102)
  | at org.elasticsearch.hadoop.rest.pooling.TransportPool.borrowTransport(TransportPool.java:131)
  | at org.elasticsearch.hadoop.rest.pooling.PooledHttpTransportFactory.borrowFrom(PooledHttpTransportFactory.java:107)
  | ... 19 more
  | Caused by: java.net.ConnectException: Connection refused (Connection refused)
  | at java.net.PlainSocketImpl.socketConnect(Native Method)
  | at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
  | at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
  | at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
  | at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
  | at java.net.Socket.connect(Socket.java:589)
  | at sun.reflect.GeneratedMethodAccessor881.invoke(Unknown Source)
  | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  | at java.lang.reflect.Method.invoke(Method.java:498)
  | at org.apache.commons.httpclient.protocol.ReflectionSocketFactory.createSocket(ReflectionSocketFactory.java:140)
  | at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:125)
  | at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707)
  | at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387)
  | at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
  | at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
  | at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
  | at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.doExecute(CommonsHttpTransport.java:685)
  | at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.access$200(CommonsHttpTransport.java:73)
  | at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport$2.run(CommonsHttpTransport.java:659)
  | at java.security.AccessController.doPrivileged(Native Method)
  | at javax.security.auth.Subject.doAs(Subject.java:422)
  | at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
  | at org.elasticsearch.hadoop.mr.security.HadoopUser.doAs(HadoopUser.java:66)
  | ... 23 more


The issue appears to be caused by the fact that method org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport#execute calls org.elasticsearch.hadoop.security.User#doAs(java.security.PrivilegedExceptionAction) when a UserProvider is defined, which in turn wraps any IOException produced by the http call in a EsHadoopException. On the other hand, the method org.elasticsearch.hadoop.rest.pooling.TransportPool#validate expects an IOException from the transport and cannot handle an EsHadoopException

Version Info

OS: : GNU/Linux (Docker) 3.10.0-957.1.3.el7.x86_64
JVM : OpenJDK 64-Bit Server VM (Zulu 8.34.0.1-CA-linux64) (build 25.201-b10, mixed mode)
Hadoop/Spark: Spark 2.4.4
ES-Hadoop : 6.7.2
ES : 6.7.2

@danielyahn
Copy link

We ran into the same issue. One of the nodes on es.nodes went offline and structured streaming failed with the same error. It was interesting to see that we were able to restart structured streaming even when that node was still offline.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 77 in stage 800.0 failed 4 times, most recent failure: Lost task 77.3 in stage 800.0 (TID 461529, ******, executor 171): org.elasticsearch.hadoop.EsHadoopException: Could not get a Transport from the Transport Pool for host [******:9200]
	at org.elasticsearch.hadoop.rest.pooling.PooledHttpTransportFactory.borrowFrom(PooledHttpTransportFactory.java:110)
	at org.elasticsearch.hadoop.rest.pooling.PooledHttpTransportFactory.create(PooledHttpTransportFactory.java:56)
	at org.elasticsearch.hadoop.rest.NetworkClient.selectNextNode(NetworkClient.java:102)
	at org.elasticsearch.hadoop.rest.NetworkClient.<init>(NetworkClient.java:85)
	at org.elasticsearch.hadoop.rest.NetworkClient.<init>(NetworkClient.java:61)
	at org.elasticsearch.hadoop.rest.RestClient.<init>(RestClient.java:99)
	at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:93)
	at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:582)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:65)
	at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.run(EsStreamQueryWriter.scala:41)
	at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:52)
	at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:51)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.hadoop.EsHadoopException: java.net.ConnectException: Connection refused (Connection refused)
	at org.elasticsearch.hadoop.mr.security.HadoopUser.doAs(HadoopUser.java:68)
	at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:656)
	at org.elasticsearch.hadoop.rest.pooling.TransportPool.validate(TransportPool.java:102)
	at org.elasticsearch.hadoop.rest.pooling.TransportPool.borrowTransport(TransportPool.java:131)
	at org.elasticsearch.hadoop.rest.pooling.PooledHttpTransportFactory.borrowFrom(PooledHttpTransportFactory.java:107)
	... 17 more

@tnycum
Copy link

tnycum commented Jan 24, 2020

Ran into this issue as well, identical to what is described in the comments above. One of the nodes in es.nodes ran into issues and shut down, which caused the Spark Streaming job to fail repeatedly until I removed the offline host from es.nodes.

Stack trace:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 554 in stage 1.0 failed 4 times, most recent failure: Lost task 554.3 in stage 1.0 (TID 1254, *********, executor 79): org.elasticsearch.hadoop.EsHadoopException: Could not get a Transport from the Transport Pool for host [********:9200]
	at org.elasticsearch.hadoop.rest.pooling.PooledHttpTransportFactory.borrowFrom(PooledHttpTransportFactory.java:110)
	at org.elasticsearch.hadoop.rest.pooling.PooledHttpTransportFactory.create(PooledHttpTransportFactory.java:56)
	at org.elasticsearch.hadoop.rest.NetworkClient.selectNextNode(NetworkClient.java:102)
	at org.elasticsearch.hadoop.rest.NetworkClient.<init>(NetworkClient.java:85)
	at org.elasticsearch.hadoop.rest.NetworkClient.<init>(NetworkClient.java:61)
	at org.elasticsearch.hadoop.rest.RestClient.<init>(RestClient.java:99)
	at org.elasticsearch.hadoop.rest.InitializationUtils.filterNonDataNodesIfNeeded(InitializationUtils.java:158)
	at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:584)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:65)
	at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.run(EsStreamQueryWriter.scala:41)
	at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:52)
	at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:51)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.hadoop.EsHadoopException: java.net.ConnectException: Connection refused (Connection refused)
	at org.elasticsearch.hadoop.mr.security.HadoopUser.doAs(HadoopUser.java:68)
	at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:656)
	at org.elasticsearch.hadoop.rest.pooling.TransportPool.validate(TransportPool.java:102)
	at org.elasticsearch.hadoop.rest.pooling.TransportPool.borrowTransport(TransportPool.java:131)
	at org.elasticsearch.hadoop.rest.pooling.PooledHttpTransportFactory.borrowFrom(PooledHttpTransportFactory.java:107)
	... 17 more

@kuldeep27396
Copy link

Do we have a solution to this?
I'm facing the same now. Any alternate or solution ...

org.elasticsearch.hadoop.EsHadoopException: Could not get a Transport from the Transport Pool for host

@masseyke
Copy link
Member

@jbaiera I bet this one was caused by #1244, with the addition of this line. That would be easy enough to fix here -- we could just change validate to catch EsHadoopException and check if the root cause is IOException. But I wonder if there are other things that has broken -- there are other places we have special logic for IOException. Maybe we ought to change User.doAs() to throw IOException?

@masseyke
Copy link
Member

I opened #2150 as a potential fix for this.

@masseyke
Copy link
Member

masseyke commented Nov 6, 2023

Fixed by #2150

@masseyke masseyke closed this as completed Nov 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants