Skip to content

Commit a662ccd

Browse files
authored
Add other streaming trigger than only ProcessingTime. (#7)
* Add other streaming trigger than only ProcessingTime. * Set kafka output topic as optional --------- Authored-by: Guillaume LECLERC <guillaume.leclerc@amadeus.com>
1 parent 6dacc29 commit a662ccd

15 files changed

Lines changed: 452 additions & 116 deletions

File tree

core/src/main/scala/com/amadeus/dataio/config/ConfigNode.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,6 @@ object ConfigNode {
4343

4444
val name = Try(config.getString("Name")).getOrElse(s"$typeName-${randomUUID().toString}")
4545

46-
ConfigNode(name, typeName, config.withoutPath("Type").withoutPath("Name"))
46+
ConfigNode(name, typeName, config.withoutPath("Type"))
4747
}
4848
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.amadeus.dataio.config.fields
2+
3+
import com.typesafe.config.Config
4+
import org.apache.spark.sql.streaming.Trigger
5+
6+
import scala.concurrent.duration.Duration
7+
import scala.util.Try
8+
9+
/**
10+
* Retrieve the trigger to be used from the configuration.
11+
*
12+
* See documentation: [[https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing]]
13+
*
14+
* Trigger = "" AvailableNow or Continuous. Optional in case of ProcessingTime trigger.
15+
* Duration = "1 minute". Optional in case of AvailableNow trigger.
16+
*/
17+
trait StreamingTriggerConfigurator {
18+
19+
/**
20+
* @param config The typesafe Config object holding the configuration.
21+
* @return the trigger of None is not defined.
22+
* @throws IllegalArgumentException in case the combination of Trigger and Duration is not supported.
23+
*/
24+
def getStreamingTrigger(implicit config: Config): Option[Trigger] = {
25+
val streamingTrigger = Try(config.getString("Trigger")).toOption
26+
val duration = Try(config.getString("Duration")).toOption
27+
28+
(streamingTrigger, duration) match {
29+
case (Some("AvailableNow"), _) => Some(Trigger.AvailableNow())
30+
case (Some("Continuous"), Some(duration)) => Some(Trigger.Continuous(Duration(duration)))
31+
case (None, Some(duration)) => Some(Trigger.ProcessingTime(Duration(duration)))
32+
case (None, None) => None
33+
case _ => throw new IllegalArgumentException(s"The couple ($streamingTrigger, $duration) is not part of the allowed values")
34+
}
35+
}
36+
}

core/src/main/scala/com/amadeus/dataio/config/fields/package.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ package object fields
1414
with PartitionByConfigurator
1515
with TimeoutConfigurator
1616
with SchemaConfigurator
17+
with StreamingTriggerConfigurator

core/src/main/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutput.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,17 @@ package com.amadeus.dataio.pipes.elk.streaming
22

33
import com.amadeus.dataio.core.{Logging, Output}
44
import com.amadeus.dataio.pipes.elk.ElkOutputCommons
5-
import com.amadeus.dataio.pipes.elk.ElkOutputCommons.{DefaultSuffixDatePattern, checkNodesIsDefined, checkPortIsDefined}
65
import com.typesafe.config.{Config, ConfigFactory}
76
import org.apache.spark.sql.streaming.Trigger
87
import org.apache.spark.sql.{Dataset, SparkSession}
98

10-
import scala.concurrent.duration.Duration
119
import scala.util.Try
1210

1311
/**
1412
* Allows to write stream data to Elasticsearch with automatic date sub-indexing.
1513
*
1614
* @param index the Index to write to.
17-
* @param processingTimeTrigger processingTimeTrigger.
15+
* @param trigger the trigger to be used for the streaming query.
1816
* @param timeout timeout in milliseconds.
1917
* @param mode mode.
2018
* @param options options.
@@ -25,7 +23,7 @@ import scala.util.Try
2523
*/
2624
case class ElkOutput(
2725
index: String,
28-
processingTimeTrigger: Trigger,
26+
trigger: Option[Trigger],
2927
timeout: Long,
3028
mode: String,
3129
options: Map[String, String] = Map.empty,
@@ -45,19 +43,22 @@ case class ElkOutput(
4543
*/
4644
def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = {
4745
val fullIndexName = computeFullIndexName()
48-
logger.info(s"Write dataframe to Elasticsearch index [$fullIndexName]")
46+
logger.info(s"Write dataframe to Elasticsearch index [$fullIndexName] using trigger [$trigger]")
4947

5048
val queryName = createQueryName()
5149

52-
val streamWriter = data.writeStream
50+
var streamWriter = data.writeStream
5351
.queryName(queryName)
5452
.outputMode(mode)
5553
.format(Format)
5654
.options(options)
5755

58-
val streamingQuery = streamWriter
59-
.trigger(processingTimeTrigger)
60-
.start(fullIndexName)
56+
streamWriter = trigger match {
57+
case Some(trigger) => streamWriter.trigger(trigger)
58+
case _ => streamWriter
59+
}
60+
61+
val streamingQuery = streamWriter.start(fullIndexName)
6162

6263
streamingQuery.awaitTermination(timeout)
6364
streamingQuery.stop()
@@ -81,6 +82,7 @@ case class ElkOutput(
8182
object ElkOutput {
8283
import com.amadeus.dataio.config.fields._
8384
import com.amadeus.dataio.pipes.elk.ElkConfigurator._
85+
import com.amadeus.dataio.pipes.elk.ElkOutputCommons.{DefaultSuffixDatePattern, checkNodesIsDefined, checkPortIsDefined}
8486

8587
/**
8688
* Creates an ElkOutput based on a given configuration.
@@ -94,8 +96,7 @@ object ElkOutput {
9496

9597
val mode = config.getString("Mode")
9698

97-
val duration = Duration(config.getString("Duration"))
98-
val processingTimeTrigger = Trigger.ProcessingTime(duration)
99+
val trigger = getStreamingTrigger
99100

100101
val timeout = getTimeout
101102

@@ -112,7 +113,7 @@ object ElkOutput {
112113

113114
ElkOutput(
114115
index = index,
115-
processingTimeTrigger = processingTimeTrigger,
116+
trigger = trigger,
116117
timeout = timeout,
117118
mode = mode,
118119
options = options,

core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,14 @@ import com.typesafe.config.{Config, ConfigFactory}
55
import org.apache.spark.sql.streaming.Trigger
66
import org.apache.spark.sql.{Dataset, SparkSession}
77

8-
import scala.concurrent.duration.Duration
98
import scala.util.Try
109

1110
/**
1211
* Class for reading kafka dataframe
1312
*
1413
* @param brokers brokers
1514
* @param topic topic
16-
* @param processingTimeTrigger processingTimeTrigger.
15+
* @param trigger the trigger to be used for the streaming query.
1716
* @param timeout timeout in milliseconds.
1817
* @param mode mode.
1918
* @param options options
@@ -23,7 +22,7 @@ import scala.util.Try
2322
case class KafkaOutput(
2423
brokers: String,
2524
topic: Option[String],
26-
processingTimeTrigger: Trigger,
25+
trigger: Option[Trigger],
2726
timeout: Long,
2827
mode: String,
2928
options: Map[String, String] = Map(),
@@ -39,23 +38,30 @@ case class KafkaOutput(
3938
* @param spark The SparkSession which will be used to write the data.
4039
*/
4140
def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = {
42-
logger.info(s"Write dataframe to kafka [$topic]")
41+
logger.info(s"Write dataframe to kafka [$topic] using trigger [$trigger]")
4342

4443
val queryName = createQueryName()
4544

46-
val streamWriter = data.writeStream
45+
var fullOptions = options + ("kafka.bootstrap.servers" -> brokers)
46+
47+
fullOptions = topic match {
48+
case Some(kafkaTopic) => options + ("topic" -> kafkaTopic)
49+
case _ => options
50+
}
51+
52+
var streamWriter = data.writeStream
4753
.queryName(queryName)
4854
.format("kafka")
49-
.options(options)
50-
.option("kafka.bootstrap.servers", brokers)
55+
.options(fullOptions)
5156
.outputMode(mode)
52-
.trigger(processingTimeTrigger)
5357

54-
val streamingQuery = topic match {
55-
case Some(t) => streamWriter.option("topic", t).start()
56-
case _ => streamWriter.start()
58+
streamWriter = trigger match {
59+
case Some(trigger) => streamWriter.trigger(trigger)
60+
case _ => streamWriter
5761
}
5862

63+
val streamingQuery = streamWriter.start()
64+
5965
streamingQuery.awaitTermination(timeout)
6066
streamingQuery.stop()
6167
}
@@ -68,10 +74,10 @@ case class KafkaOutput(
6874
private[streaming] def createQueryName(): String = {
6975

7076
(outputName, topic) match {
71-
case (Some(name), Some(t)) => s"QN_${name}_${t}_${java.util.UUID.randomUUID}"
72-
case (Some(name), None) => s"QN_${name}_${java.util.UUID.randomUUID}"
73-
case (None, Some(t)) => s"QN_KafkaOutput_${t}_${java.util.UUID.randomUUID}"
74-
case _ => s"QN_KafkaOutput_${java.util.UUID.randomUUID}"
77+
case (Some(name), Some(kafkaTopic)) => s"QN_${name}_${kafkaTopic}_${java.util.UUID.randomUUID}"
78+
case (Some(name), None) => s"QN_${name}_${java.util.UUID.randomUUID}"
79+
case (None, Some(kafkaTopic)) => s"QN_${kafkaTopic}_${java.util.UUID.randomUUID}"
80+
case _ => s"QN_KafkaOutput_${java.util.UUID.randomUUID}"
7581
}
7682

7783
}
@@ -92,8 +98,7 @@ object KafkaOutput {
9298
val brokers = getBroker
9399
val topic = getTopic
94100

95-
val duration = Duration(config.getString("Duration"))
96-
val processingTimeTrigger = Trigger.ProcessingTime(duration)
101+
val trigger = getStreamingTrigger
97102

98103
val timeout = getTimeout
99104
val mode = config.getString("Mode")
@@ -104,7 +109,7 @@ object KafkaOutput {
104109
KafkaOutput(
105110
brokers,
106111
topic,
107-
processingTimeTrigger,
112+
trigger,
108113
timeout,
109114
mode,
110115
options,

core/src/main/scala/com/amadeus/dataio/pipes/storage/streaming/StorageOutput.scala

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import org.apache.spark.sql.streaming.Trigger
66
import org.apache.spark.sql.{Dataset, SparkSession}
77

88
import java.io.File
9-
import scala.concurrent.duration.Duration
109
import scala.util.Try
1110

1211
/**
@@ -15,7 +14,7 @@ import scala.util.Try
1514
* @param format the output format.
1615
* @param path the path.
1716
* @param partitioningColumns the columns to partition by.
18-
* @param processingTimeTrigger processingTimeTrigger.
17+
* @param trigger the trigger to be used for the streaming query.
1918
* @param timeout timeout in milliseconds.
2019
* @param mode mode.
2120
* @param options options.
@@ -26,7 +25,7 @@ case class StorageOutput(
2625
format: String,
2726
path: String,
2827
partitioningColumns: Seq[String],
29-
processingTimeTrigger: Trigger,
28+
trigger: Option[Trigger],
3029
timeout: Long,
3130
mode: String,
3231
options: Map[String, String] = Map(),
@@ -42,7 +41,7 @@ case class StorageOutput(
4241
* @param spark The SparkSession which will be used to write the data.
4342
*/
4443
def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = {
45-
logger.info(s"Write dataframe to storage [$path]")
44+
logger.info(s"Write dataframe to storage [$path] using trigger [$trigger]")
4645

4746
val queryName = createQueryName()
4847

@@ -57,9 +56,12 @@ case class StorageOutput(
5756
streamWriter = streamWriter.partitionBy(partitioningColumns: _*)
5857
}
5958

60-
val streamingQuery = streamWriter
61-
.trigger(processingTimeTrigger)
62-
.start(path)
59+
streamWriter = trigger match {
60+
case Some(trigger) => streamWriter.trigger(trigger)
61+
case _ => streamWriter
62+
}
63+
64+
val streamingQuery = streamWriter.start(path)
6365

6466
streamingQuery.awaitTermination(timeout)
6567
streamingQuery.stop()
@@ -73,6 +75,8 @@ case class StorageOutput(
7375
private[streaming] def createQueryName(): String = {
7476
val directory = Try { path.split(File.separatorChar).reverse.head }.toOption
7577

78+
logger.info(s"CreateQueryName based on $directory and $outputName.")
79+
7680
val queryName: String = (directory, outputName) match {
7781
case (Some(directoryName), Some(name)) => s"QN_${name}_${directoryName}_${java.util.UUID.randomUUID}"
7882
case (Some(directoryName), _) => s"QN_${directoryName}_${java.util.UUID.randomUUID}"
@@ -93,6 +97,7 @@ object StorageOutput {
9397
* @return a new instance of StorageOutput.
9498
*/
9599
def apply(implicit config: Config): StorageOutput = {
100+
96101
val format = config.getString("Format")
97102
val path = getPath
98103

@@ -102,8 +107,7 @@ object StorageOutput {
102107

103108
val mode = config.getString("Mode")
104109

105-
val duration = Duration(config.getString("Duration"))
106-
val processingTimeTrigger = Trigger.ProcessingTime(duration)
110+
val trigger = getStreamingTrigger
107111

108112
val timeout = getTimeout
109113

@@ -113,7 +117,7 @@ object StorageOutput {
113117
format,
114118
path,
115119
partitioningColumns,
116-
processingTimeTrigger,
120+
trigger,
117121
timeout,
118122
mode,
119123
options,

core/src/test/scala/com/amadeus/dataio/config/ConfigNodeTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class ConfigNodeTest extends AnyWordSpec with Matchers {
4949
result.name shouldBe "my-entity"
5050
result.typeName shouldBe "com.Entity"
5151
result.config.getInt("Field1") shouldBe 5
52-
result.config.withoutPath("Field1").isEmpty shouldBe true
52+
result.config.withoutPath("Field1").withoutPath("Name").isEmpty shouldBe true
5353
}
5454
}
5555

0 commit comments

Comments
 (0)