This section lists Opaque's supported functionalities, which is a subset of that of Spark SQL. Note that the syntax for these functionalities is the same as Spark SQL -- Opaque simply replaces the execution to work with encrypted data.
Out of the existing Spark SQL types, Opaque supports
- All numeric types.
DecimalType
is supported via conversion intoFloatType
StringType
BinaryType
BooleanType
TimestampTime
,DateType
ArrayType
,MapType
Warning: Conversions from FloatType
to StringType
may produce incorrect results in the least significant portion of the floating point value. See this issue for further details.
We currently support a subset of the Spark SQL functions, including both scalar and aggregate-like functions.
- Scalar functions:
case
,cast
,concat
,contains
,if
,in
,like
,substring
,upper
- Aggregate functions:
average
,count
,first
,last
,max
,min
,sum
UDFs are not supported directly, but one can :ref:`extend Opaque with additional functions <udfs>` by writing it in C++.
Opaque supports the core SQL operators:
- Projection (e.g.,
SELECT
statements) - Filter
- Global aggregation and grouping aggregation
- Order by, sort by
- All join types except: cross join, full outer join, existence join
- Limit
Because Opaque SQL only replaces physical operators to work with encrypted data, the DataFrame interface is exactly the same as Spark's both for Scala and Python. Opaque SQL is still a work in progress, so not all of these functionalities are currently implemented. See below for a complete list in Scala.
- collect(): Array[T]
- collectAsList(): List[T]
- count(): Long
- first(): T
- foreach(func: ForeachFunction[T]): Unit
- foreach(f: T => Unit): Unit
- foreachPartition(func: ForeachPartitionFunction[T]): Unit
- foreachPartition(f: Iterator[T] => Unit): Unit
- head(): T
- head(n: Int): Array[T]
- show(numRows: Int, truncate: Int, vertical: Boolean): Unit
- show(numRows: Int, truncate: Int): Unit
- show(numRows: Int, truncate: Boolean): Unit
- show(truncate: Boolean): Unit
- show(): Unit
- show(numRows: Int): Unit
- take(n: Int): Array[T]
- takeAsList(n: Int): List[T]
- toLocalIterator(): Iterator[T]
- cache(): Dataset.this.type
- columns: Array[String]
- createGlobalTempView(viewName: String): Unit
- createOrReplaceGlobalTempView(viewName: String): Unit
- createOrReplaceTempView(viewName: String): Unit
- createTempView(viewName: String): Unit
- dtypes: Array[(String, String)]
- explain(): Unit
- explain(extended: Boolean): Unit
- explain(mode: String): Unit
- hint(name: String, parameters: Any*): Dataset[T]
- inputFiles: Array[String]
- isEmpty: Boolean
- isLocal: Boolean
- javaRDD: JavaRDD[T]
- localCheckpoint(eager: Boolean): Dataset[T]
- localCheckpoint(): Dataset[T]
- printSchema(level: Int): Unit
- printSchema(): Unit
- rdd: org.apache.spark.rdd.RDD[T]
- schema: types.StructType
- storageLevel: org.apache.spark.storage.StorageLevel
- toDF(colNames: String*): DataFrame
- toDF(): DataFrame
- toJavaRDD: JavaRDD[T]
- unpersist(): Dataset.this.type
- unpersist(blocking: Boolean): Dataset.this.type
- write: DataFrameWriter[T]
- writeStream: streaming.DataStreamWriter[T]
- writeTo(table: String): DataFrameWriterV2[T]
- registerTempTable(tableName: String): Unit
- alias(alias: Symbol): Dataset[T]
- alias(alias: String): Dataset[T]
- as(alias: Symbol): Dataset[T]
- as(alias: String): Dataset[T]
- coalesce(numPartitions: Int): Dataset[T]
- distinct(): Dataset[T]
- dropDuplicates(col1: String, cols: String*): Dataset[T]
- dropDuplicates(colNames: Array[String]): Dataset[T]
- dropDuplicates(colNames: Seq[String]): Dataset[T]
- dropDuplicates(): Dataset[T]
- filter(func: FilterFunction[T]): Dataset[T]
- filter(func: T => Boolean): Dataset[T]
- filter(conditionExpr: String): Dataset[T]
- filter(condition: Column): Dataset[T]
- flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U]
- flatMap[U](func: T => TraversableOnce[U])(implicitevidence: Encoder[U]): Dataset[U]
- groupByKey[K](func: MapFunction[T, K], encoder: Encoder[K]): KeyValueGroupedDataset[K, T]
- groupByKey[K](func: T => K)(implicitevidence: Encoder[K]): KeyValueGroupedDataset[K, T]
- joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)]
- joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)]
- limit(n: Int): Dataset[T]
- map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U]
- map[U](func: T => U)(implicitevidence: Encoder[U]): Dataset[U]
- mapPartitions[U](f: MapPartitionsFunction[T, U], encoder: Encoder[U]): Dataset[U]
- mapPartitions[U](func: Iterator[T] => Iterator[U])(implicitevidence: Encoder[U]): Dataset[U]
- orderBy(sortExprs: Column*): Dataset[T]
- orderBy(sortCol: String, sortCols: String*): Dataset[T]
- randomSplit(weights: Array[Double]): Array[Dataset[T]]
- randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
- randomSplitAsList(weights: Array[Double], seed: Long): List[Dataset[T]]
- repartition(partitionExprs: Column*): Dataset[T]
- repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
- repartition(numPartitions: Int): Dataset[T]
- repartitionByRange(partitionExprs: Column*): Dataset[T]
- repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
- select[U1, U2, U3, U4, U5](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2], c3: TypedColumn[T, U3], c4: TypedColumn[T, U4], c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)]
- select[U1, U2, U3, U4](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2], c3: TypedColumn[T, U3], c4: TypedColumn[T, U4]): Dataset[(U1, U2, U3, U4)]
- select[U1, U2, U3](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2], c3: TypedColumn[T, U3]): Dataset[(U1, U2, U3)]
- select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)]
- select[U1](c1: TypedColumn[T, U1]): Dataset[U1]
- sort(sortExprs: Column*): Dataset[T]
- sort(sortCol: String, sortCols: String*): Dataset[T]
- sortWithinPartitions(sortExprs: Column*): Dataset[T]
- sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T]
- transform[U](t: Dataset[T] => Dataset[U]): Dataset[U]
- union(other: Dataset[T]): Dataset[T]
- unionAll(other: Dataset[T]): Dataset[T]
- unionByName(other: Dataset[T], allowMissingColumns: Boolean): Dataset[T]
- unionByName(other: Dataset[T]): Dataset[T]
- where(conditionExpr: String): Dataset[T]
- where(condition: Column): Dataset[T]
- agg(expr: Column, exprs: Column*): DataFrame
- agg(exprs: Map[String, String]): DataFrame
- agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
- apply(colName: String): Column
- col(colName: String): Column
- colRegex(colName: String): Column
- drop(col: Column): DataFrame
- drop(colNames: String*): DataFrame
- drop(colName: String): DataFrame
- groupBy(col1: String, cols: String*): RelationalGroupedDataset
- groupBy(cols: Column*): RelationalGroupedDataset
- hashCode(): Int
- join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
- join(right: Dataset[_], joinExprs: Column): DataFrame
- join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame
- join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
- join(right: Dataset[_], usingColumn: String): DataFrame
- join(right: Dataset[_]): DataFrame
- na: DataFrameNaFunctions
- select(col: String, cols: String*): DataFrame
- select(cols: Column*): DataFrame
- selectExpr(exprs: String*): DataFrame
- stat: DataFrameStatFunctions
- withColumn(colName: String, col: Column): DataFrame
- withColumnRenamed(existingName: String, newName: String): DataFrame
- encoder: Encoder[T]
- queryExecution: execution.QueryExecution
- sameSemantics(other: Dataset[T]): Boolean
- semanticHash(): Int
- sparkSession: SparkSession
- sqlContext: SQLContext
- toJSON: Dataset[String]
- toString(): String
- describe(cols: String*): DataFrame
- reduce(func: ReduceFunction[T]): T
- reduce(func: (T, T) => T): T
- summary(statistics: String*): DataFrame
- tail(n: Int): Array[T]
- as[U](implicitevidence: Encoder[U]): Dataset[U]
- checkpoint(eager: Boolean): Dataset[T]
- checkpoint(): Dataset[T]
- persist(newLevel: org.apache.spark.storage.StorageLevel): Dataset.this.type
- persist(): Dataset.this.type
- except(other: Dataset[T]): Dataset[T]
- exceptAll(other: Dataset[T]): Dataset[T]
- intersect(other: Dataset[T]): Dataset[T]
- intersectAll(other: Dataset[T]): Dataset[T]
- observe(name: String, expr: Column, exprs: Column*): Dataset[T]
- sample(withReplacement: Boolean, fraction: Double): Dataset[T]
- sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
- sample(fraction: Double): Dataset[T]
- sample(fraction: Double, seed: Long): Dataset[T]
- crossJoin(right: Dataset[_]): DataFrame
- cube(col1: String, cols: String*): RelationalGroupedDataset
- cube(cols: Column*): RelationalGroupedDataset
- rollup(col1: String, cols: String*): RelationalGroupedDataset
- rollup(cols: Column*): RelationalGroupedDataset
- explode[A, B](inputColumn: String, outputColumn: String)(f: A => TraversableOnce[B])(implicitevidence: reflect.runtime.universe.TypeTag[B]): DataFrame
- explode[A <: Product](input: Column*)(f: Row => TraversableOnce[A])(implicitevidence: reflect.runtime.universe.TypeTag[A]): DataFrame
* Cross joins and full outer joins are not supported. Aggregations with more than one distinct aggregate expression are not supported.
To run a Spark SQL UDF within Opaque enclaves, first name it explicitly and define it in Scala, then reimplement it in C++ against Opaque's serialized row representation.
For example, suppose we wish to implement a UDF called dot
, which computes the dot product of two double arrays (Array[Double]
). We [define it in Scala](src/main/scala/edu/berkeley/cs/rise/opaque/expressions/DotProduct.scala) in terms of the Breeze linear algebra library's implementation. We can then use it in a DataFrame query, such as logistic regression.
Now we can port this UDF to Opaque as follows:
Define a corresponding expression using Opaque's expression serialization format by adding the following to [Expr.fbs](src/flatbuffers/Expr.fbs), which indicates that a DotProduct expression takes two inputs (the two double arrays):
table DotProduct { left:Expr; right:Expr; }
In the same file, add
DotProduct
to the list of expressions inExprUnion
.Implement the serialization logic from the Scala
DotProduct
UDF to the Opaque expression that we just defined. InUtils.flatbuffersSerializeExpression
(fromUtils.scala
), add a case forDotProduct
as follows:case (DotProduct(left, right), Seq(leftOffset, rightOffset)) => tuix.Expr.createExpr( builder, tuix.ExprUnion.DotProduct, tuix.DotProduct.createDotProduct( builder, leftOffset, rightOffset))
Finally, implement the UDF in C++. In
FlatbuffersExpressionEvaluator#eval_helper
(fromExpressionEvaluation.h
), add a case fortuix::ExprUnion_DotProduct
. Within that case, cast the expression to atuix::DotProduct
, recursively evaluate the left and right children, perform the dot product computation on them, and construct aDoubleField
containing the result.