Skip to content

Commit caa4b73

Browse files
authored
Support spark 3.4
1 parent 936f6df commit caa4b73

29 files changed

+2811
-0
lines changed

azure-pipelines-pr.yml

+5
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ parameters:
7878
- '3.3.2'
7979
- '3.3.3'
8080
- '3.3.4'
81+
- '3.4.0'
82+
- '3.4.1'
83+
- '3.4.2'
84+
- '3.4.3'
85+
- '3.4.4'
8186
- '3.5.0'
8287
- '3.5.1'
8388
- '3.5.2'

src/csharp/Extensions/Microsoft.Spark.Extensions.Delta.E2ETest/DeltaFixture.cs

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public DeltaFixture()
2727
(3, 3, 2) => "delta-core_2.12:2.3.0",
2828
(3, 3, 3) => "delta-core_2.12:2.3.0",
2929
(3, 3, 4) => "delta-core_2.12:2.3.0",
30+
(3, 4, _) => "delta-core_2.12:2.4.0",
3031
(3, 5, _) => "delta-spark_2.12:3.2.0",
3132
_ => throw new NotSupportedException($"Spark {sparkVersion} not supported.")
3233
};

src/scala/microsoft-spark-3-4/pom.xml

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
<parent>
5+
<groupId>com.microsoft.scala</groupId>
6+
<artifactId>microsoft-spark</artifactId>
7+
<version>${microsoft-spark.version}</version>
8+
</parent>
9+
<artifactId>microsoft-spark-3-4_2.12</artifactId>
10+
<inceptionYear>2019</inceptionYear>
11+
<properties>
12+
<encoding>UTF-8</encoding>
13+
<scala.version>2.12.18</scala.version>
14+
<scala.binary.version>2.12</scala.binary.version>
15+
<spark.version>3.4.0</spark.version>
16+
</properties>
17+
18+
<dependencies>
19+
<dependency>
20+
<groupId>org.scala-lang</groupId>
21+
<artifactId>scala-library</artifactId>
22+
<version>${scala.version}</version>
23+
</dependency>
24+
<dependency>
25+
<groupId>org.apache.spark</groupId>
26+
<artifactId>spark-core_${scala.binary.version}</artifactId>
27+
<version>${spark.version}</version>
28+
<scope>provided</scope>
29+
</dependency>
30+
<dependency>
31+
<groupId>org.apache.spark</groupId>
32+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
33+
<version>${spark.version}</version>
34+
<scope>provided</scope>
35+
</dependency>
36+
<dependency>
37+
<groupId>org.apache.spark</groupId>
38+
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
39+
<version>${spark.version}</version>
40+
<scope>provided</scope>
41+
</dependency>
42+
<dependency>
43+
<groupId>junit</groupId>
44+
<artifactId>junit</artifactId>
45+
<version>4.13.1</version>
46+
<scope>test</scope>
47+
</dependency>
48+
<dependency>
49+
<groupId>org.specs</groupId>
50+
<artifactId>specs</artifactId>
51+
<version>1.2.5</version>
52+
<scope>test</scope>
53+
</dependency>
54+
</dependencies>
55+
56+
<build>
57+
<sourceDirectory>src/main/scala</sourceDirectory>
58+
<testSourceDirectory>src/test/scala</testSourceDirectory>
59+
<plugins>
60+
<plugin>
61+
<groupId>org.scala-tools</groupId>
62+
<artifactId>maven-scala-plugin</artifactId>
63+
<version>2.15.2</version>
64+
<executions>
65+
<execution>
66+
<goals>
67+
<goal>compile</goal>
68+
<goal>testCompile</goal>
69+
</goals>
70+
</execution>
71+
</executions>
72+
<configuration>
73+
<scalaVersion>${scala.version}</scalaVersion>
74+
<args>
75+
<arg>-target:jvm-1.8</arg>
76+
<arg>-deprecation</arg>
77+
<arg>-feature</arg>
78+
</args>
79+
</configuration>
80+
</plugin>
81+
</plugins>
82+
</build>
83+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the .NET Foundation under one or more agreements.
3+
* The .NET Foundation licenses this file to you under the MIT license.
4+
* See the LICENSE file in the project root for more information.
5+
*/
6+
7+
package org.apache.spark.api.dotnet
8+
9+
import java.io.DataOutputStream
10+
11+
import org.apache.spark.internal.Logging
12+
13+
import scala.collection.mutable.Queue
14+
15+
/**
16+
* CallbackClient is used to communicate with the Dotnet CallbackServer.
17+
* The client manages and maintains a pool of open CallbackConnections.
18+
* Any callback request is delegated to a new CallbackConnection or
19+
* unused CallbackConnection.
20+
* @param address The address of the Dotnet CallbackServer
21+
* @param port The port of the Dotnet CallbackServer
22+
*/
23+
class CallbackClient(serDe: SerDe, address: String, port: Int) extends Logging {
24+
private[this] val connectionPool: Queue[CallbackConnection] = Queue[CallbackConnection]()
25+
26+
private[this] var isShutdown: Boolean = false
27+
28+
final def send(callbackId: Int, writeBody: (DataOutputStream, SerDe) => Unit): Unit =
29+
getOrCreateConnection() match {
30+
case Some(connection) =>
31+
try {
32+
connection.send(callbackId, writeBody)
33+
addConnection(connection)
34+
} catch {
35+
case e: Exception =>
36+
logError(s"Error calling callback [callback id = $callbackId].", e)
37+
connection.close()
38+
throw e
39+
}
40+
case None => throw new Exception("Unable to get or create connection.")
41+
}
42+
43+
private def getOrCreateConnection(): Option[CallbackConnection] = synchronized {
44+
if (isShutdown) {
45+
logInfo("Cannot get or create connection while client is shutdown.")
46+
return None
47+
}
48+
49+
if (connectionPool.nonEmpty) {
50+
return Some(connectionPool.dequeue())
51+
}
52+
53+
Some(new CallbackConnection(serDe, address, port))
54+
}
55+
56+
private def addConnection(connection: CallbackConnection): Unit = synchronized {
57+
assert(connection != null)
58+
connectionPool.enqueue(connection)
59+
}
60+
61+
def shutdown(): Unit = synchronized {
62+
if (isShutdown) {
63+
logInfo("Shutdown called, but already shutdown.")
64+
return
65+
}
66+
67+
logInfo("Shutting down.")
68+
connectionPool.foreach(_.close)
69+
connectionPool.clear
70+
isShutdown = true
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the .NET Foundation under one or more agreements.
3+
* The .NET Foundation licenses this file to you under the MIT license.
4+
* See the LICENSE file in the project root for more information.
5+
*/
6+
7+
package org.apache.spark.api.dotnet
8+
9+
import java.io.{ByteArrayOutputStream, Closeable, DataInputStream, DataOutputStream}
10+
import java.net.Socket
11+
12+
import org.apache.spark.internal.Logging
13+
14+
/**
15+
* CallbackConnection is used to process the callback communication
16+
* between the JVM and Dotnet. It uses a TCP socket to communicate with
17+
* the Dotnet CallbackServer and the socket is expected to be reused.
18+
* @param address The address of the Dotnet CallbackServer
19+
* @param port The port of the Dotnet CallbackServer
20+
*/
21+
class CallbackConnection(serDe: SerDe, address: String, port: Int) extends Logging {
22+
private[this] val socket: Socket = new Socket(address, port)
23+
private[this] val inputStream: DataInputStream = new DataInputStream(socket.getInputStream)
24+
private[this] val outputStream: DataOutputStream = new DataOutputStream(socket.getOutputStream)
25+
26+
def send(
27+
callbackId: Int,
28+
writeBody: (DataOutputStream, SerDe) => Unit): Unit = {
29+
logInfo(s"Calling callback [callback id = $callbackId] ...")
30+
31+
try {
32+
serDe.writeInt(outputStream, CallbackFlags.CALLBACK)
33+
serDe.writeInt(outputStream, callbackId)
34+
35+
val byteArrayOutputStream = new ByteArrayOutputStream()
36+
writeBody(new DataOutputStream(byteArrayOutputStream), serDe)
37+
serDe.writeInt(outputStream, byteArrayOutputStream.size)
38+
byteArrayOutputStream.writeTo(outputStream);
39+
} catch {
40+
case e: Exception => {
41+
throw new Exception("Error writing to stream.", e)
42+
}
43+
}
44+
45+
logInfo(s"Signaling END_OF_STREAM.")
46+
try {
47+
serDe.writeInt(outputStream, CallbackFlags.END_OF_STREAM)
48+
outputStream.flush()
49+
50+
val endOfStreamResponse = readFlag(inputStream)
51+
endOfStreamResponse match {
52+
case CallbackFlags.END_OF_STREAM =>
53+
logInfo(s"Received END_OF_STREAM signal. Calling callback [callback id = $callbackId] successful.")
54+
case _ => {
55+
throw new Exception(s"Error verifying end of stream. Expected: ${CallbackFlags.END_OF_STREAM}, " +
56+
s"Received: $endOfStreamResponse")
57+
}
58+
}
59+
} catch {
60+
case e: Exception => {
61+
throw new Exception("Error while verifying end of stream.", e)
62+
}
63+
}
64+
}
65+
66+
def close(): Unit = {
67+
try {
68+
serDe.writeInt(outputStream, CallbackFlags.CLOSE)
69+
outputStream.flush()
70+
} catch {
71+
case e: Exception => logInfo("Unable to send close to .NET callback server.", e)
72+
}
73+
74+
close(socket)
75+
close(outputStream)
76+
close(inputStream)
77+
}
78+
79+
private def close(s: Socket): Unit = {
80+
try {
81+
assert(s != null)
82+
s.close()
83+
} catch {
84+
case e: Exception => logInfo("Unable to close socket.", e)
85+
}
86+
}
87+
88+
private def close(c: Closeable): Unit = {
89+
try {
90+
assert(c != null)
91+
c.close()
92+
} catch {
93+
case e: Exception => logInfo("Unable to close closeable.", e)
94+
}
95+
}
96+
97+
private def readFlag(inputStream: DataInputStream): Int = {
98+
val callbackFlag = serDe.readInt(inputStream)
99+
if (callbackFlag == CallbackFlags.DOTNET_EXCEPTION_THROWN) {
100+
val exceptionMessage = serDe.readString(inputStream)
101+
throw new DotnetException(exceptionMessage)
102+
}
103+
callbackFlag
104+
}
105+
106+
private object CallbackFlags {
107+
val CLOSE: Int = -1
108+
val CALLBACK: Int = -2
109+
val DOTNET_EXCEPTION_THROWN: Int = -3
110+
val END_OF_STREAM: Int = -4
111+
}
112+
}

0 commit comments

Comments
 (0)