Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ object DynamicBrokerConfig {
}
}
val configHandler = new BrokerConfigHandler(config, quotaManagers)
configHandler.processConfigChanges("", dynamicPerBrokerConfigs)
configHandler.processConfigChanges("", dynamicDefaultConfigs)
configHandler.processConfigChanges(config.brokerId.toString, dynamicPerBrokerConfigs)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
// wait for MetricsReporter
val reporters = TestMetricsReporter.waitForReporters(servers.size)
reporters.foreach { reporter =>
reporter.verifyState(reconfigureCount = 0, deleteCount = 0, pollingInterval = 1000)
reporter.verifyState(reconfigureCount = 0, deleteCount = 0, pollingInterval = 1000, numFetcher = 2)
assertFalse(reporter.kafkaMetrics.isEmpty, "No metrics found")
}

Expand All @@ -330,15 +330,15 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
alterConfigsUsingConfigCommand(updatedProps)
waitForConfig(TestMetricsReporter.PollingIntervalProp, "1000")
reporters.foreach { reporter =>
reporter.verifyState(reconfigureCount = 0, deleteCount = 0, pollingInterval = 1000)
reporter.verifyState(reconfigureCount = 0, deleteCount = 0, pollingInterval = 1000, numFetcher = 2)
}

// 2. verify update occurring if the value of property changed.
updatedProps.put(TestMetricsReporter.PollingIntervalProp, PollingIntervalUpdateVal)
alterConfigsUsingConfigCommand(updatedProps)
waitForConfig(TestMetricsReporter.PollingIntervalProp, "2000")
reporters.foreach { reporter =>
reporter.verifyState(reconfigureCount = 1, deleteCount = 0, pollingInterval = 2000)
reporter.verifyState(reconfigureCount = 1, deleteCount = 0, pollingInterval = 2000, numFetcher = 2)
}
}

Expand Down Expand Up @@ -947,6 +947,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
// Add a new metrics reporter
val newProps = new Properties
newProps.put(TestMetricsReporter.PollingIntervalProp, "100")
newProps.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "1")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change all verifyState calls in this method to numFetcher = 1? It's easier to understand, IMO.
Like reporters.foreach(_.verifyState(reconfigureCount = 0, deleteCount = 0, pollingInterval = 100, numFetcher=1))

configureMetricsReporters(Seq(classOf[JmxReporter], classOf[TestMetricsReporter]), newProps)

val reporters = TestMetricsReporter.waitForReporters(servers.size + controllerServers.size)
Expand Down Expand Up @@ -1102,13 +1103,25 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
// modify snapshot interval config to explicitly take snapshot on a broker with valid dynamic configs
val props = defaultStaticConfig(numServers)
props.put(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, "10000")
props.put(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, classOf[TestMetricsReporter].getName)
props.put(TestMetricsReporter.PollingIntervalProp, "1000")

val kafkaConfig = KafkaConfig.fromProps(props)
val newBroker = createBroker(kafkaConfig).asInstanceOf[BrokerServer]
servers += newBroker

alterSslKeystoreUsingConfigCommand(sslProperties1, listenerPrefix(SecureExternal))

// Add num.replica.fetchers to the cluster-level config.
val clusterLevelProps = new Properties
clusterLevelProps.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "3")
reconfigureServers(clusterLevelProps, perBrokerConfig = false,
(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "3"))

// Wait for the metrics reporter to be configured
val initialReporter = TestMetricsReporter.waitForReporters(1).head
initialReporter.verifyState(reconfigureCount = 1, deleteCount = 0, pollingInterval = 1000, numFetcher = 3)

TestUtils.ensureConsistentKRaftMetadata(servers, controllerServer)

TestUtils.waitUntilTrue(
Expand All @@ -1121,11 +1134,19 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
newBroker.shutdown()
newBroker.awaitShutdown()

// Clean up the test reporter
TestMetricsReporter.testReporters.clear()

val invalidStaticConfigs = defaultStaticConfig(newBroker.config.brokerId)
invalidStaticConfigs.putAll(securityProps(invalidSslConfigs, KEYSTORE_PROPS, listenerPrefix(SecureExternal)))
newBroker.config.updateCurrentConfig(KafkaConfig.fromProps(invalidStaticConfigs))

newBroker.startup()

// Verify that the custom MetricsReporter is not reconfigured after restart.
// If readDynamicBrokerConfigsFromSnapshot works correctly, the reporter should maintain its state.
val reporterAfterRestart = TestMetricsReporter.waitForReporters(1).head
reporterAfterRestart.verifyState(reconfigureCount = 0, deleteCount = 0, pollingInterval = 1000, numFetcher = 3)
}

private def awaitInitialPositions(consumer: Consumer[_, _]): Unit = {
Expand Down Expand Up @@ -1568,6 +1589,7 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close
@volatile var closeCount = 0
@volatile var clusterUpdateCount = 0
@volatile var pollingInterval: Int = -1
@volatile var numFetchers: Int = 1
Copy link
Collaborator

@Yunyung Yunyung Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe changing it to = 0 makes it clearer that it must be changed statically/dynamically.

Copy link
Collaborator

@Yunyung Yunyung Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore this.
For this reason: https://issues.apache.org/jira/browse/KAFKA-19645 , it is probably more reasonable to keep it as 1 instead of setting invalid value.

testReporters.add(this)

override def contextChange(metricsContext: MetricsContext): Unit = {
Expand All @@ -1584,6 +1606,7 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close
configuredBrokers += configs.get(ServerConfigs.BROKER_ID_CONFIG).toString.toInt
configureCount += 1
pollingInterval = configs.get(PollingIntervalProp).toString.toInt
numFetchers = configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
}

override def metricChange(metric: KafkaMetric): Unit = {
Expand All @@ -1599,31 +1622,37 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close
}

override def reconfigurableConfigs(): util.Set[String] = {
util.Set.of(PollingIntervalProp)
util.Set.of(PollingIntervalProp, ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Thanks for your help. Could you please consider creating an individual metrics reporter for the new test? Including too many test cases in TestMetricsReporter could make it harder to understand

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add another TestNumReplicaFetcherMetricsReporter for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert the change of TestMetricsReporter

}

override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
val pollingInterval = configs.get(PollingIntervalProp).toString.toInt
if (pollingInterval <= 0)
throw new ConfigException(s"Invalid polling interval $pollingInterval")

val numFetchers = configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
if (numFetchers <= 0)
throw new ConfigException(s"Invalid num.replica.fetchers $numFetchers")
}

override def reconfigure(configs: util.Map[String, _]): Unit = {
reconfigureCount += 1
pollingInterval = configs.get(PollingIntervalProp).toString.toInt
numFetchers = configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
}

override def close(): Unit = {
closeCount += 1
}

def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval: Int): Unit = {
def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval: Int, numFetcher: Int = 1): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval: Int, numFetcher: Int = 1): Unit = {
def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval: Int, numFetcher: Int): Unit = {

assertEquals(1, initializeCount)
assertEquals(1, configureCount)
assertEquals(reconfigureCount, this.reconfigureCount)
assertEquals(deleteCount, closeCount)
assertEquals(1, clusterUpdateCount)
assertEquals(pollingInterval, this.pollingInterval)
assertEquals(numFetcher, this.numFetchers)
}

def verifyMetricValue(name: String, group: String): Unit = {
Expand Down