1
pub mod types;
2
#[macro_use]
3
pub mod observe;
4
mod ingest;
5

            
6
pub mod config;
7
mod risc0_runner;
8
mod transaction_sender;
9
use {
10
    anyhow::Result,
11
    bonsol_prover::input_resolver::DefaultInputResolver,
12
    config::*,
13
    ingest::{GrpcIngester, Ingester, RpcIngester},
14
    metrics::counter,
15
    metrics_exporter_prometheus::PrometheusBuilder,
16
    observe::MetricEvents,
17
    risc0_runner::Risc0Runner,
18
    rlimit::Resource,
19
    solana_rpc_client::nonblocking::rpc_client::RpcClient,
20
    solana_sdk::{pubkey::Pubkey, signature::read_keypair_file, signer::Signer},
21
    std::{process::exit, str::FromStr, sync::Arc, time::Duration},
22
    thiserror::Error,
23
    tokio::{select, signal},
24
    tracing::{error, info},
25
    tracing_subscriber,
26
    transaction_sender::{rpc::RpcTransactionSender, transaction_sender::TransactionSender},
27
};
28

            
29
#[derive(Error, Debug)]
30
pub enum CliError {
31
    #[error("Invalid RPC URL")]
32
    InvalidRpcUrl,
33
    #[error("Invalid Bonsol program")]
34
    InvalidBonsolProgram,
35
    #[allow(dead_code)]
36
    #[error("Invalid RISC0 image folder")]
37
    InvalidRisc0ImageFolder,
38
    #[error("Invalid signer: Missing/Invalid")]
39
    InvalidSigner,
40
    #[error("Invalid Ingester")]
41
    InvalidIngester,
42
    #[error("Invalid Transaction Sender")]
43
    InvalidTransactionSender,
44
}
45

            
46
async fn node(config: ProverNodeConfig) -> Result<()> {
47
    let program = Pubkey::from_str(&config.bonsol_program)?;
48
    //todo use traits for signer
49
    let signer = match config.signer_config.clone() {
50
        SignerConfig::KeypairFile { path } => {
51
            info!("Using Keypair File");
52
            read_keypair_file(&path).map_err(|_| CliError::InvalidSigner)?
53
        }
54
    };
55
    let signer_identity = signer.pubkey();
56
    //Todo traitify ingester
57
    let mut ingester: Box<dyn Ingester> = match config.ingester_config.clone() {
58
        IngesterConfig::RpcBlockSubscription { wss_rpc_url } => {
59
            info!("Using RPC Block Subscription");
60
            Box::new(RpcIngester::new(wss_rpc_url))
61
        }
62
        IngesterConfig::GrpcSubscription {
63
            grpc_url,
64
            token,
65
            connection_timeout_secs,
66
            timeout_secs,
67
        } => {
68
            info!("Using GRPC Subscription");
69
            Box::new(GrpcIngester::new(
70
                grpc_url,
71
                token,
72
                Some(connection_timeout_secs),
73
                Some(timeout_secs),
74
            ))
75
        }
76
        _ => return Err(CliError::InvalidIngester.into()),
77
    };
78
    let (mut transaction_sender, solana_rpc_client) = match config.transaction_sender_config.clone()
79
    {
80
        TransactionSenderConfig::Rpc { rpc_url } => (
81
            RpcTransactionSender::new(rpc_url.clone(), program, signer),
82
            RpcClient::new(rpc_url),
83
        ),
84
        _ => return Err(CliError::InvalidRpcUrl.into()),
85
    };
86
    transaction_sender.start();
87
    let input_resolver = DefaultInputResolver::new_with_opts(
88
        Arc::new(reqwest::Client::new()),
89
        Arc::new(solana_rpc_client),
90
        Some(config.max_input_size_mb),
91
        Some(Duration::from_secs(
92
            config.image_download_timeout_secs as u64,
93
        )),
94
    );
95
    //may take time to load images, depending on the number of images TODO put limit
96
    let mut runner = Risc0Runner::new(
97
        config.clone(),
98
        signer_identity,
99
        Arc::new(transaction_sender),
100
        Arc::new(input_resolver),
101
    )
102
    .await?;
103
    let runner_chan = runner.start()?;
104
    let mut ingester_chan = ingester.start(program)?;
105
    let ingestor = tokio::spawn(async move {
106
        while let Some(bix) = ingester_chan.recv().await {
107
            for ix in bix {
108
                println!("Sending to runner");
109
                runner_chan.send(ix).unwrap();
110
            }
111
        }
112
    });
113
    select! {
114
        e = ingestor => {
115
            info!("Runner exited: {:?}", e);
116
            ingester.stop()?;
117
            runner.stop()?;
118
        },
119
    }
120
    Ok(())
121
}
122

            
123
#[tokio::main]
124
async fn main() -> Result<()> {
125
    // Set the stack size to unlimited
126
    match rlimit::setrlimit(Resource::STACK, u64::MAX, u64::MAX) {
127
        Ok(_) => {}
128
        Err(e) => error!("Error setting rlimit: {}", e),
129
    }
130
    tracing_subscriber::fmt()
131
        .json()
132
        .with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339())
133
        .init();
134
    let args: Vec<String> = std::env::args().collect();
135
    if args.len() < 3 || args[1] != "-f" {
136
        error!("Usage: bonsol-node -f <config_file>");
137
        return Ok(());
138
    }
139
    let config_file = &args[2];
140
    let config = config::load_config(config_file);
141
    if let MetricsConfig::Prometheus {} = config.metrics_config {
142
        let builder = PrometheusBuilder::new();
143
        builder
144
            .install()
145
            .expect("failed to install prometheus exporter");
146
        info!("Prometheus exporter installed");
147
    }
148
    emit_event!(MetricEvents::BonsolStartup, up => true);
149
    node(config).await?;
150
    select! {
151
        _ = signal::ctrl_c() => {
152
            info!("Received Ctrl-C");
153
            info!("Exited");
154
            exit(1);
155
        },
156
    }
157
}