Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public static ExpressionNode makeLiteral(Object obj, DataType dataType, Boolean
checkDecimalScale(decimal.scale());
return makeDecimalLiteral(decimal);
}
} else if (dataType instanceof ArrayType) {
} else if (dataType instanceof ArrayType) {
if (obj == null) {
ArrayType arrayType = (ArrayType)dataType;
return makeNullLiteral(TypeBuilder.makeList(nullable,
Expand All @@ -210,10 +210,22 @@ public static ExpressionNode makeLiteral(Object obj, DataType dataType, Boolean
}
return makeStringList(list);
}
} else if (dataType instanceof MapType) {
if (obj == null) {
MapType mapType = (MapType) dataType;
TypeNode keyType = ConverterUtils.getTypeNode(mapType.keyType(), false);
TypeNode valType = ConverterUtils.getTypeNode(mapType.valueType(),
mapType.valueContainsNull());
return makeNullLiteral(TypeBuilder.makeMap(nullable, keyType, valType));
} else {
throw new UnsupportedOperationException(
String.format("Type not supported: %s.", dataType.toString()));
}
} else {
/// TODO(taiyang-li) implement Literal Node for Struct/Map/Array
throw new UnsupportedOperationException(
String.format("Type not supported: %s.", dataType.toString()));
String.format("Type not supported: %s, obj: %s, class: %s",
dataType.toString(), obj.toString(), obj.getClass().toString()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ package io.glutenproject.expression
import io.glutenproject.expression.ConverterUtils.FunctionConfig
import io.glutenproject.substrait.`type`.TypeBuilder
import io.glutenproject.substrait.expression.{ExpressionBuilder, ExpressionNode}
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.GlutenConfig

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types._

import scala.collection.mutable.ArrayBuffer
import com.google.common.collect.Lists

class CreateArrayTransformer(substraitExprName: String, children: Seq[ExpressionTransformer],
useStringTypeWhenEmpty: Boolean,
Expand All @@ -34,18 +37,19 @@ class CreateArrayTransformer(substraitExprName: String, children: Seq[Expression
with Logging {

override def doTransform(args: java.lang.Object): ExpressionNode = {
// If children is empty,
// transformation is only supported when useStringTypeWhenEmpty is false
// because ClickHouse and Velox currently doesn't support this config.
if (useStringTypeWhenEmpty && children.isEmpty) {
throw new UnsupportedOperationException(s"${original} not supported yet.")
}

val childNodes = new java.util.ArrayList[ExpressionNode]()
children.foreach(child => {
val childNode = child.doTransform(args)
childNodes.add(childNode)
})

// Only support transforming when useStringTypeWhenEmpty is false
// because ClickHouse and Velox currently doesn't support this feature.
if (useStringTypeWhenEmpty) {
throw new UnsupportedOperationException(s"${original} not supported yet.")
}

val functionMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]]
val functionName = ConverterUtils.makeFuncName(substraitExprName,
original.children.map(_.dataType), FunctionConfig.OPT)
Expand All @@ -54,3 +58,35 @@ class CreateArrayTransformer(substraitExprName: String, children: Seq[Expression
ExpressionBuilder.makeScalarFunction(functionId, childNodes, typeNode)
}
}

class GetArrayItemTransformer(substraitExprName: String, left: ExpressionTransformer,
right: ExpressionTransformer, failOnError: Boolean, original: GetArrayItem)
extends ExpressionTransformer
with Logging {

override def doTransform(args: java.lang.Object): ExpressionNode = {
// Ignore failOnError for clickhouse backend
val functionMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]]
val leftNode = left.doTransform(args)
var rightNode = right.doTransform(args)

// In Spark, the index of getarrayitem starts from 0
// But in CH and velox, the index of arrayElement starts from 1, besides index argument must
// So we need to do transform: rightNode = add(rightNode, 1)
val addFunctionName = ConverterUtils.makeFuncName(ExpressionMappings.ADD,
Seq(IntegerType, original.right.dataType), FunctionConfig.OPT)
val addFunctionId = ExpressionBuilder.newScalarFunction(functionMap, addFunctionName)
val literalNode = ExpressionBuilder.makeLiteral(1.toInt, IntegerType, false)
rightNode = ExpressionBuilder.makeScalarFunction(addFunctionId,
Lists.newArrayList(literalNode, rightNode),
ConverterUtils.getTypeNode(original.right.dataType, original.right.nullable))

val functionName = ConverterUtils.makeFuncName(substraitExprName,
Seq(original.left.dataType, original.right.dataType), FunctionConfig.OPT)
val functionId = ExpressionBuilder.newScalarFunction(functionMap, functionName)
val exprNodes = Lists.newArrayList(leftNode.asInstanceOf[ExpressionNode],
rightNode.asInstanceOf[ExpressionNode])
val typeNode = ConverterUtils.getTypeNode(original.dataType, original.nullable)
ExpressionBuilder.makeScalarFunction(functionId, exprNodes, typeNode)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ object ExpressionConverter extends Logging {
val children = c.children.map(child =>
replaceWithExpressionTransformer(child, attributeSeq))
new CreateArrayTransformer(substraitExprName.get, children, true, c)
case g: GetArrayItem =>
new GetArrayItemTransformer(
substraitExprName.get,
replaceWithExpressionTransformer(g.left, attributeSeq),
replaceWithExpressionTransformer(g.right, attributeSeq),
g.failOnError,
g)
case c: CreateMap =>
val children = c.children.map(child =>
replaceWithExpressionTransformer(child, attributeSeq))
new CreateMapTransformer(substraitExprName.get, children, c.useStringTypeWhenEmpty, c)
case g: GetMapValue =>
new GetMapValueTransformer(
substraitExprName.get,
replaceWithExpressionTransformer(g.child, attributeSeq),
replaceWithExpressionTransformer(g.key, attributeSeq),
g.failOnError,
g)
case e: Explode =>
new ExplodeTransformer(substraitExprName.get,
replaceWithExpressionTransformer(e.child, attributeSeq), e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ object ExpressionMappings {
// Array functions
final val SIZE = "size"
final val CREATE_ARRAY = "array"
final val GET_ARRAY_ITEM = "get_array_item"

// Map functions
final val CREATE_MAP = "map"
final val GET_MAP_VALUE = "get_map_value"

// Spark 3.3
final val SPLIT_PART = "split_part"
Expand Down Expand Up @@ -278,6 +283,10 @@ object ExpressionMappings {
Sig[Size](SIZE),
Sig[CreateArray](CREATE_ARRAY),
Sig[Explode](EXPLODE),
Sig[GetArrayItem](GET_ARRAY_ITEM),
// Map functions
Sig[CreateMap](CREATE_MAP),
Sig[GetMapValue](GET_MAP_VALUE),
// Directly use child expression transformer
Sig[KnownFloatingPointNormalized](KNOWN_FLOATING_POINT_NORMALIZED),
Sig[NormalizeNaNAndZero](NORMALIZE_NANAND_ZERO),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.glutenproject.expression

import io.glutenproject.expression.ConverterUtils.FunctionConfig
import io.glutenproject.substrait.`type`.TypeBuilder
import io.glutenproject.substrait.expression.{ExpressionBuilder, ExpressionNode}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.DataType
import io.glutenproject.backendsapi.BackendsApiManager

import scala.collection.mutable.ArrayBuffer
import com.google.common.collect.Lists
import io.glutenproject.GlutenConfig

class CreateMapTransformer(substraitExprName: String, children: Seq[ExpressionTransformer],
useStringTypeWhenEmpty: Boolean, original: CreateMap)
extends ExpressionTransformer
with Logging {

override def doTransform(args: java.lang.Object): ExpressionNode = {
// If children is empty,
// transformation is only supported when useStringTypeWhenEmpty is false
// because ClickHouse and Velox currently doesn't support this config.
if (children.isEmpty && useStringTypeWhenEmpty) {
throw new UnsupportedOperationException(s"not supported yet.")
}

val childNodes = new java.util.ArrayList[ExpressionNode]()
children.foreach(child => {
val childNode = child.doTransform(args)
childNodes.add(childNode)
})

val functionMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]]
val functionName = ConverterUtils.makeFuncName(substraitExprName,
original.children.map(_.dataType), FunctionConfig.OPT)
val functionId = ExpressionBuilder.newScalarFunction(functionMap, functionName)
val typeNode = ConverterUtils.getTypeNode(original.dataType, original.nullable)
ExpressionBuilder.makeScalarFunction(functionId, childNodes, typeNode)
}
}

class GetMapValueTransformer(substraitExprName: String, child: ExpressionTransformer,
key: ExpressionTransformer, failOnError: Boolean, original: GetMapValue)
extends ExpressionTransformer
with Logging {

override def doTransform(args: java.lang.Object): ExpressionNode = {
// ClickHouse backend doesn't support fail on error
if (BackendsApiManager.getBackendName.equalsIgnoreCase(
GlutenConfig.GLUTEN_CLICKHOUSE_BACKEND) && failOnError) {
throw new UnsupportedOperationException(s"not supported yet.")
}

// Velox backend always fails on error
if (BackendsApiManager.getBackendName.equalsIgnoreCase(
GlutenConfig.GLUTEN_VELOX_BACKEND) && !failOnError) {
throw new UnsupportedOperationException(s"not supported yet.")
}

val childNode = child.doTransform(args)
val keyNode = key.doTransform(args)

val functionMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]]
val functionName = ConverterUtils.makeFuncName(substraitExprName,
Seq(original.child.dataType, original.key.dataType), FunctionConfig.OPT)
val functionId = ExpressionBuilder.newScalarFunction(functionMap, functionName)
val exprNodes = Lists.newArrayList(
childNode.asInstanceOf[ExpressionNode],
keyNode.asInstanceOf[ExpressionNode])
val typeNode = ConverterUtils.getTypeNode(original.dataType, original.nullable)
ExpressionBuilder.makeScalarFunction(functionId, exprNodes, typeNode)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package io.glutenproject.utils.clickhouse

import io.glutenproject.utils.BackendTestSettings
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.GlutenArithmeticExpressionSuite
import org.apache.spark.sql.catalyst.expressions._

object ClickHouseTestSettings extends BackendTestSettings {

Expand Down Expand Up @@ -104,7 +104,7 @@ object ClickHouseTestSettings extends BackendTestSettings {
)

enableSuite[GlutenComplexTypesSuite]

enableSuite[GlutenComplexTypeSuite]
enableSuite[GlutenArithmeticExpressionSuite]
.exclude(
"- (UnaryMinus)",
Expand Down