Skip to content

Conversation

jim0987795064
Copy link
Contributor

@jim0987795064 jim0987795064 commented Aug 25, 2025

  • Changes: Replace misused dynamicPerBrokerConfigs with
    dynamicDefaultConfigs
  • Reasons: KRaft servers don't handle the cluser-level configs in
    starting

from: https://github.com/apache/kafka/pull/18949/files#r2296809389

Reviewers: Jun Rao junrao@gmail.com, Jhen-Yung Hsu
jhenyunghsu@gmail.com, PoAn Yang payang@apache.org, Chia-Ping Tsai
chia7712@gmail.com

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jim0987795064 : Thanks for the PR. A couple of comments. Also, could we add a test case?

@@ -231,15 +230,15 @@ object DynamicBrokerConfig {
if (configRecord.resourceName().isEmpty) {
putOrRemoveIfNull(dynamicDefaultConfigs, configRecord.name(), configRecord.value())
} else if (configRecord.resourceName() == config.brokerId.toString) {
putOrRemoveIfNull(dynamicPerBrokerConfigs, configRecord.name(), configRecord.value())
putOrRemoveIfNull(dynamicDefaultConfigs, configRecord.name(), configRecord.value())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this doesn't look right. If configRecord.resourceName() == config.brokerId.toString, it's a broker level config and it seems that we should use dynamicPerBrokerConfigs.

configHandler.processConfigChanges("", dynamicPerBrokerConfigs)
configHandler.processConfigChanges(config.brokerId.toString, dynamicPerBrokerConfigs)
configHandler.processConfigChanges("", dynamicDefaultConfigs)
configHandler.processConfigChanges(config.brokerId.toString, dynamicDefaultConfigs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we should continue to use dynamicPerBrokerConfigs here?

Copy link
Contributor Author

@jim0987795064 jim0987795064 Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao
Thanks for pointing this out. I've corrected the misused dynamicDefaultConfigs and now use dynamicPerBrokerConfigs for broker-scoped updates.

@jim0987795064
Copy link
Contributor Author

jim0987795064 commented Aug 25, 2025

@jim0987795064 : Thanks for the PR. A couple of comments. Also, could we add a test case?

Thanks for the feedback. I'm still working on adding the test case and will update the PR once it's ready.

@chia7712
Copy link
Member

Thanks for the feedback. I'm still working on adding the test case and will update the PR once it's ready.

Perhaps we could enhance testServersCanStartWithInvalidStaticConfigsAndValidDynamicConfigs to cover this case.

  1. add a custom MetricsReporter to the newBroker. the MetricsReporter must rely on a dynamic server configuration, such as num.replica.fetchers
  2. add num.replica.fetchers to the cluster-level config
  3. if readDynamicBrokerConfigsFromSnapshot works correctly, the custom MetricsReporter should not be reconfigured after a restart.

Signed-off-by: PoAn Yang <payang@apache.org>
@FrankYang0529 FrankYang0529 requested a review from chia7712 August 26, 2025 06:29
@@ -1599,31 +1622,37 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close
}

override def reconfigurableConfigs(): util.Set[String] = {
util.Set.of(PollingIntervalProp)
util.Set.of(PollingIntervalProp, ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Thanks for your help. Could you please consider creating an individual metrics reporter for the new test? Including too many test cases in TestMetricsReporter could make it harder to understand

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add another TestNumReplicaFetcherMetricsReporter for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert the change of TestMetricsReporter

Copy link
Collaborator

@Yunyung Yunyung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. One nitpick,

@@ -1568,6 +1589,7 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close
@volatile var closeCount = 0
@volatile var clusterUpdateCount = 0
@volatile var pollingInterval: Int = -1
@volatile var numFetchers: Int = 1
Copy link
Collaborator

@Yunyung Yunyung Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe changing it to = 0 makes it clearer that it must be changed statically/dynamically.

Copy link
Collaborator

@Yunyung Yunyung Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore this.
For this reason: https://issues.apache.org/jira/browse/KAFKA-19645 , it is probably more reasonable to keep it as 1 instead of setting invalid value.

}

override def close(): Unit = {
closeCount += 1
}

def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval: Int): Unit = {
def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval: Int, numFetcher: Int = 1): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval: Int, numFetcher: Int = 1): Unit = {
def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval: Int, numFetcher: Int): Unit = {

@@ -947,6 +947,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
// Add a new metrics reporter
val newProps = new Properties
newProps.put(TestMetricsReporter.PollingIntervalProp, "100")
newProps.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "1")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change all verifyState calls in this method to numFetcher = 1? It's easier to understand, IMO.
Like reporters.foreach(_.verifyState(reconfigureCount = 0, deleteCount = 0, pollingInterval = 100, numFetcher=1))

Signed-off-by: PoAn Yang <payang@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jim0987795064 : Thanks for the updated PR. Just a minor comment.

@@ -1634,6 +1655,71 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close
}
}

object TestNumReplicaFetcherMetricsReporter {
val testReporters = new ConcurrentLinkedQueue[TestNumReplicaFetcherMetricsReporter]()
val configuredBrokers = mutable.Set[Int]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being used?

}


class TestNumReplicaFetcherMetricsReporter extends MetricsReporter with Reconfigurable with Closeable with ClusterResourceListener {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class TestNumReplicaFetcherMetricsReporter extends MetricsReporter

@volatile var numFetchers: Int = 1
testReporters.add(this)

override def contextChange(metricsContext: MetricsContext): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to override the method with empty body

override def metricRemoval(metric: KafkaMetric): Unit = {
}

override def onUpdate(clusterResource: ClusterResource): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Signed-off-by: PoAn Yang <payang@apache.org>
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@chia7712 chia7712 merged commit 8de88db into apache:4.1 Aug 27, 2025
16 of 20 checks passed
chia7712 pushed a commit that referenced this pull request Aug 27, 2025
#20405)

- **Changes**: Replace misused dynamicPerBrokerConfigs with
dynamicDefaultConfigs
- **Reasons**: KRaft servers don't handle the cluser-level configs in
starting

from: https://github.com/apache/kafka/pull/18949/files#r2296809389

Reviewers: Jun Rao <junrao@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>

---------

Co-authored-by: PoAn Yang <payang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants