1
pub mod types;
2
#[macro_use]
3
pub mod observe;
4
mod ingest;
5

            
6
pub mod config;
7
mod risc0_runner;
8
mod transaction_sender;
9
use {
10
    anyhow::Result,
11
    bonsol_prover::input_resolver::DefaultInputResolver,
12
    config::*,
13
    ingest::{GrpcIngester, Ingester, RpcIngester},
14
    metrics::counter,
15
    metrics_exporter_prometheus::PrometheusBuilder,
16
    observe::MetricEvents,
17
    risc0_runner::Risc0Runner,
18
    rlimit::Resource,
19
    solana_rpc_client::nonblocking::rpc_client::RpcClient,
20
    solana_sdk::{pubkey::Pubkey, signature::read_keypair_file, signer::Signer},
21
    std::{process::exit, str::FromStr, sync::Arc, time::Duration},
22
    thiserror::Error,
23
    tokio::{select, signal},
24
    tracing::{error, info},
25
    tracing_subscriber,
26
    transaction_sender::{RpcTransactionSender, TransactionSender},
27
};
28

            
29
#[derive(Error, Debug)]
30
pub enum CliError {
31
    #[error("Invalid RPC URL")]
32
    InvalidRpcUrl,
33
    #[error("Invalid Bonsol program")]
34
    InvalidBonsolProgram,
35
    #[allow(dead_code)]
36
    #[error("Invalid RISC0 image folder")]
37
    InvalidRisc0ImageFolder,
38
    #[error("Invalid signer: Missing/Invalid")]
39
    InvalidSigner,
40
    #[error("Invalid Ingester")]
41
    InvalidIngester,
42
    #[error("Invalid Transaction Sender")]
43
    InvalidTransactionSender,
44
}
45

            
46
#[tokio::main]
47
async fn main() -> Result<()> {
48
    // Set the stack size to unlimited
49
    match rlimit::setrlimit(Resource::STACK, u64::MAX, u64::MAX) {
50
        Ok(_) => {}
51
        Err(e) => error!("Error setting rlimit: {}", e),
52
    }
53
    tracing_subscriber::fmt()
54
        .json()
55
        .with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339())
56
        .init();
57
    let args: Vec<String> = std::env::args().collect();
58
    if args.len() < 3 || args[1] != "-f" {
59
        error!("Usage: bonsol-node -f <config_file>");
60
        return Ok(());
61
    }
62
    let config_file = &args[2];
63
    let config = config::load_config(config_file);
64
    let program = Pubkey::from_str(&config.bonsol_program)?;
65
    if let MetricsConfig::Prometheus {} = config.metrics_config {
66
        let builder = PrometheusBuilder::new();
67
        builder
68
            .install()
69
            .expect("failed to install prometheus exporter");
70
        info!("Prometheus exporter installed");
71
    }
72
    emit_event!(MetricEvents::BonsolStartup, up => true);
73
    //todo use traits for signer
74
    let signer = match config.signer_config.clone() {
75
        SignerConfig::KeypairFile { path } => {
76
            info!("Using Keypair File");
77
            read_keypair_file(&path).map_err(|_| CliError::InvalidSigner)?
78
        }
79
    };
80
    let signer_identity = signer.pubkey();
81
    //Todo traitify ingester
82
    let mut ingester: Box<dyn Ingester> = match config.ingester_config.clone() {
83
        IngesterConfig::RpcBlockSubscription { wss_rpc_url } => {
84
            info!("Using RPC Block Subscription");
85
            Box::new(RpcIngester::new(wss_rpc_url))
86
        }
87
        IngesterConfig::GrpcSubscription {
88
            grpc_url,
89
            token,
90
            connection_timeout_secs,
91
            timeout_secs,
92
        } => {
93
            info!("Using GRPC Subscription");
94
            Box::new(GrpcIngester::new(
95
                grpc_url,
96
                token,
97
                Some(connection_timeout_secs),
98
                Some(timeout_secs),
99
            ))
100
        }
101
        _ => return Err(CliError::InvalidIngester.into()),
102
    };
103

            
104
    let (mut transaction_sender, solana_rpc_client) = match config.transaction_sender_config.clone()
105
    {
106
        TransactionSenderConfig::Rpc { rpc_url } => (
107
            RpcTransactionSender::new(rpc_url.clone(), program, signer),
108
            RpcClient::new(rpc_url),
109
        ),
110
        _ => return Err(CliError::InvalidRpcUrl.into()),
111
    };
112
    transaction_sender.start();
113
    let input_resolver = DefaultInputResolver::new_with_opts(
114
        Arc::new(reqwest::Client::new()),
115
        Arc::new(solana_rpc_client),
116
        Some(config.max_input_size_mb),
117
        Some(Duration::from_secs(
118
            config.image_download_timeout_secs as u64,
119
        )),
120
    );
121
    //may take time to load images, depending on the number of images TODO put limit
122
    let mut runner = Risc0Runner::new(
123
        config.clone(),
124
        signer_identity,
125
        Arc::new(transaction_sender),
126
        Arc::new(input_resolver),
127
    )
128
    .await?;
129
    let runner_chan = runner.start()?;
130
    let mut ingester_chan = ingester.start(program)?;
131
    let handle = tokio::spawn(async move {
132
        while let Some(bix) = ingester_chan.recv().await {
133
            for ix in bix {
134
                println!("Sending to runner");
135
                runner_chan.send(ix).unwrap();
136
            }
137
        }
138
    });
139
    select! {
140
        e = handle => {
141
            info!("Runner exited: {:?}", e);
142
            let _ = ingester.stop();
143
            let _ = runner.stop();
144
        },
145
        _ = signal::ctrl_c() => {
146
            info!("Received Ctrl-C");
147
            exit(1);
148
        },
149
    }
150
    info!("Exited");
151

            
152
    Ok(())
153
}