Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions chain/ethereum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ tiny-keccak = "1.5.0"
hex = "0.4.3"
semver = "1.0.26"
thiserror = { workspace = true }
tokio = { version = "1", features = ["full"] }

itertools = "0.14.0"

Expand Down
71 changes: 71 additions & 0 deletions chain/ethereum/src/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use crate::adapter::EthereumAdapter as EthereumAdapterTrait;
use crate::EthereumAdapter;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tokio::time::sleep;
#[derive(Debug)]
pub struct Health {
pub provider: Arc<EthereumAdapter>,
latency: Arc<RwLock<Duration>>,
error_rate: Arc<RwLock<f64>>,
consecutive_failures: Arc<RwLock<u32>>,
}

impl Health {
pub fn new(provider: Arc<EthereumAdapter>) -> Self {
Self {
provider,
latency: Arc::new(RwLock::new(Duration::from_secs(0))),
error_rate: Arc::new(RwLock::new(0.0)),
consecutive_failures: Arc::new(RwLock::new(0)),
}
}

pub fn provider(&self) -> &str {
self.provider.provider()
}

pub async fn check(&self) {
let start_time = Instant::now();
// For now, we'll just simulate a health check.
// In a real implementation, we would send a request to the provider.
let success = self.provider.provider().contains("rpc1"); // Simulate a failure for rpc2
let latency = start_time.elapsed();

self.update_metrics(success, latency);
}

fn update_metrics(&self, success: bool, latency: Duration) {
let mut latency_w = self.latency.write().unwrap();
*latency_w = latency;

let mut error_rate_w = self.error_rate.write().unwrap();
let mut consecutive_failures_w = self.consecutive_failures.write().unwrap();

if success {
*error_rate_w = *error_rate_w * 0.9; // Decay the error rate
*consecutive_failures_w = 0;
} else {
*error_rate_w = *error_rate_w * 0.9 + 0.1; // Increase the error rate
*consecutive_failures_w += 1;
}
}

pub fn score(&self) -> f64 {
let latency = *self.latency.read().unwrap();
let error_rate = *self.error_rate.read().unwrap();
let consecutive_failures = *self.consecutive_failures.read().unwrap();

// This is a simple scoring algorithm. A more sophisticated algorithm could be used here.
1.0 / (1.0 + latency.as_secs_f64() + error_rate + (consecutive_failures as f64))
}
}

pub async fn health_check_task(health_checkers: Vec<Arc<Health>>) {
loop {
for health_checker in &health_checkers {
health_checker.check().await;
}
sleep(Duration::from_secs(10)).await;
}
}
1 change: 1 addition & 0 deletions chain/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod codec;
mod data_source;
mod env;
mod ethereum_adapter;
pub mod health;
mod ingestor;
mod polling_block_stream;
pub mod runtime;
Expand Down
56 changes: 46 additions & 10 deletions chain/ethereum/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub const DEFAULT_ADAPTER_ERROR_RETEST_PERCENT: f64 = 0.2;
pub struct EthereumNetworkAdapter {
endpoint_metrics: Arc<EndpointMetrics>,
pub capabilities: NodeCapabilities,
adapter: Arc<EthereumAdapter>,
pub adapter: Arc<EthereumAdapter>,
/// The maximum number of times this adapter can be used. We use the
/// strong_count on `adapter` to determine whether the adapter is above
/// that limit. That's a somewhat imprecise but convenient way to
Expand Down Expand Up @@ -86,6 +86,8 @@ impl EthereumNetworkAdapter {
}
}

use crate::health::Health;

#[derive(Debug, Clone)]
pub struct EthereumNetworkAdapters {
chain_id: ChainName,
Expand All @@ -94,6 +96,7 @@ pub struct EthereumNetworkAdapters {
// Percentage of request that should be used to retest errored adapters.
retest_percent: f64,
weighted: bool,
health_checkers: Vec<Arc<Health>>,
}

impl EthereumNetworkAdapters {
Expand All @@ -104,6 +107,7 @@ impl EthereumNetworkAdapters {
call_only_adapters: vec![],
retest_percent: DEFAULT_ADAPTER_ERROR_RETEST_PERCENT,
weighted: false,
health_checkers: vec![],
}
}

Expand All @@ -130,7 +134,7 @@ impl EthereumNetworkAdapters {
ProviderCheckStrategy::MarkAsValid,
);

Self::new(chain_id, provider, call_only, None, false)
Self::new(chain_id, provider, call_only, None, false, vec![])
}

pub fn new(
Expand All @@ -139,6 +143,7 @@ impl EthereumNetworkAdapters {
call_only_adapters: Vec<EthereumNetworkAdapter>,
retest_percent: Option<f64>,
weighted: bool,
health_checkers: Vec<Arc<Health>>,
) -> Self {
#[cfg(debug_assertions)]
call_only_adapters.iter().for_each(|a| {
Expand All @@ -151,6 +156,7 @@ impl EthereumNetworkAdapters {
call_only_adapters,
retest_percent: retest_percent.unwrap_or(DEFAULT_ADAPTER_ERROR_RETEST_PERCENT),
weighted,
health_checkers,
}
}

Expand Down Expand Up @@ -267,7 +273,19 @@ impl EthereumNetworkAdapters {
required_capabilities
));
}
let weights: Vec<_> = input.iter().map(|a| a.weight).collect();

let weights: Vec<_> = input
.iter()
.map(|a| {
let health_checker = self
.health_checkers
.iter()
.find(|h| h.provider() == a.provider());
let score = health_checker.map_or(1.0, |h| h.score());
a.weight * score
})
.collect();

if let Ok(dist) = WeightedIndex::new(&weights) {
let idx = dist.sample(&mut rand::rng());
Ok(input[idx].adapter.clone())
Expand Down Expand Up @@ -382,6 +400,7 @@ impl EthereumNetworkAdapters {

#[cfg(test)]
mod tests {
use super::Health;
use graph::cheap_clone::CheapClone;
use graph::components::network_provider::ProviderCheckStrategy;
use graph::components::network_provider::ProviderManager;
Expand Down Expand Up @@ -842,10 +861,17 @@ mod tests {
vec![],
Some(0f64),
false,
vec![],
);

let always_retest_adapters =
EthereumNetworkAdapters::new(chain_id, manager.clone(), vec![], Some(1f64), false);
let always_retest_adapters = EthereumNetworkAdapters::new(
chain_id,
manager.clone(),
vec![],
Some(1f64),
false,
vec![],
);

assert_eq!(
no_retest_adapters
Expand Down Expand Up @@ -937,6 +963,7 @@ mod tests {
vec![],
Some(1f64),
false,
vec![],
);

assert_eq!(
Expand All @@ -960,8 +987,14 @@ mod tests {
ProviderCheckStrategy::MarkAsValid,
);

let no_retest_adapters =
EthereumNetworkAdapters::new(chain_id.clone(), manager, vec![], Some(0f64), false);
let no_retest_adapters = EthereumNetworkAdapters::new(
chain_id.clone(),
manager,
vec![],
Some(0f64),
false,
vec![],
);
assert_eq!(
no_retest_adapters
.cheapest_with(&NodeCapabilities {
Expand Down Expand Up @@ -1003,7 +1036,7 @@ mod tests {
);

let no_available_adapter =
EthereumNetworkAdapters::new(chain_id, manager, vec![], None, false);
EthereumNetworkAdapters::new(chain_id, manager, vec![], None, false, vec![]);
let res = no_available_adapter
.cheapest_with(&NodeCapabilities {
archive: true,
Expand Down Expand Up @@ -1077,7 +1110,7 @@ mod tests {
.await,
);

let adapters = EthereumNetworkAdapters::for_testing(
let mut adapters = EthereumNetworkAdapters::for_testing(
vec![
EthereumNetworkAdapter::new(
metrics.cheap_clone(),
Expand All @@ -1104,7 +1137,10 @@ mod tests {
)
.await;

let mut adapters = adapters;
let health_checker1 = Arc::new(Health::new(adapter1.clone()));
let health_checker2 = Arc::new(Health::new(adapter2.clone()));

adapters.health_checkers = vec![health_checker1.clone(), health_checker2.clone()];
adapters.weighted = true;

let mut adapter1_count = 0;
Expand Down
1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ prometheus = { version = "0.14.0", features = ["push"] }
json-structural-diff = { version = "0.2", features = ["colorize"] }
globset = "0.4.16"
notify = "8.0.0"
tokio = { version = "1.47.1", features = ["full"] }

[target.'cfg(unix)'.dependencies]
pgtemp = { git = "https://github.com/graphprotocol/pgtemp", branch = "initdb-args" }
15 changes: 15 additions & 0 deletions node/src/network_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,15 @@ impl AdapterConfiguration {
}
}

use graph_chain_ethereum::health::{health_check_task, Health};

pub struct Networks {
pub adapters: Vec<AdapterConfiguration>,
pub rpc_provider_manager: ProviderManager<EthereumNetworkAdapter>,
pub firehose_provider_manager: ProviderManager<Arc<FirehoseEndpoint>>,
pub substreams_provider_manager: ProviderManager<Arc<FirehoseEndpoint>>,
pub weighted_rpc_steering: bool,
pub health_checkers: Vec<Arc<Health>>,
}

impl Networks {
Expand All @@ -132,6 +135,7 @@ impl Networks {
ProviderCheckStrategy::MarkAsValid,
),
weighted_rpc_steering: false,
health_checkers: vec![],
}
}

Expand Down Expand Up @@ -293,6 +297,15 @@ impl Networks {
},
);

let health_checkers: Vec<_> = eth_adapters
.clone()
.flat_map(|(_, adapters)| adapters)
.map(|adapter| Arc::new(Health::new(adapter.adapter.clone())))
.collect();
if weighted_rpc_steering {
tokio::spawn(health_check_task(health_checkers.clone()));
}

let firehose_adapters = adapters
.iter()
.flat_map(|a| a.as_firehose())
Expand Down Expand Up @@ -341,6 +354,7 @@ impl Networks {
ProviderCheckStrategy::RequireAll(provider_checks),
),
weighted_rpc_steering,
health_checkers,
};

s
Expand Down Expand Up @@ -455,6 +469,7 @@ impl Networks {
eth_adapters,
None,
self.weighted_rpc_steering,
self.health_checkers.clone(),
)
}
}