diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala index 4c969f1bbefd0..c687c7f01ed7a 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala @@ -49,7 +49,10 @@ abstract class GraphExecution( /** Increments flow execution retry count for `flow`. */ private def incrementFlowToNumConsecutiveFailure(flowIdentifier: TableIdentifier): Unit = { - flowToNumConsecutiveFailure.put(flowIdentifier, flowToNumConsecutiveFailure(flowIdentifier) + 1) + flowToNumConsecutiveFailure.updateWith(flowIdentifier) { + case Some(count) => Some(count + 1) + case None => Some(1) + } } /** diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala index 4aaa139378b93..4fcd9dad93fe7 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableC import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.pipelines.common.{FlowStatus, RunState} import org.apache.spark.sql.pipelines.graph.TriggeredGraphExecution.StreamState -import org.apache.spark.sql.pipelines.logging.EventLevel +import org.apache.spark.sql.pipelines.logging.{EventLevel, FlowProgress} import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructType} @@ -1027,4 +1027,33 @@ class TriggeredGraphExecutionSuite extends ExecutionTest with SharedSparkSession } ) } + + test("consecutive failure event level is correct") { + val session = spark + import session.implicits._ + + val pipelineDef = new TestGraphRegistrationContext(spark) { + registerMaterializedView( + "retry_test", + partitionCols = Some(Seq("nonexistent_col")), + query = dfFlowFunc(spark.range(5).withColumn("id_mod", ($"id" % 2).cast("int"))) + ) + } + + val graph = pipelineDef.toDataflowGraph + val updateContext = TestPipelineUpdateContext(spark, graph) + updateContext.pipelineExecution.runPipeline() + updateContext.pipelineExecution.awaitCompletion() + + val failedEvents = updateContext.eventBuffer.getEvents.filter { e => + e.details.isInstanceOf[FlowProgress] && + e.details.asInstanceOf[FlowProgress].status == FlowStatus.FAILED + } + + val warnCount = failedEvents.count(_.level == EventLevel.WARN) + // flowToNumConsecutiveFailure controls that the last failure should be logged as ERROR + val errorCount = failedEvents.count(_.level == EventLevel.ERROR) + + assert(warnCount == 2 && errorCount == 1) + } }