1
use {
2
    super::{Ingester, IngesterResult, TxChannel},
3
    crate::{
4
        ingest::{IngestError, IngestErrorType},
5
        types::BonsolInstruction,
6
    },
7
    anyhow::Result,
8
    solana_pubsub_client::nonblocking::pubsub_client::PubsubClient,
9
    solana_rpc_client_api::config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter},
10
    solana_sdk::{bs58, commitment_config::CommitmentConfig, pubkey::Pubkey},
11
    solana_transaction_status::{
12
        EncodedTransactionWithStatusMeta, UiInnerInstructions, UiInstruction, UiTransactionEncoding,
13
    },
14
    tokio::{sync::mpsc::UnboundedSender, task::JoinHandle},
15
    tracing::error,
16
};
17

            
18
use futures_util::StreamExt;
19

            
20
pub struct RpcIngester {
21
    rpc_url: String,
22
    op_handle: Option<JoinHandle<()>>,
23
}
24

            
25
impl RpcIngester {
26
    pub const fn new(rpc_url: String) -> RpcIngester {
27
        RpcIngester {
28
            op_handle: None,
29
            rpc_url,
30
        }
31
    }
32
}
33

            
34
// todo find a way to consume without clone
35
fn filter_txs(
36
    program: &Pubkey,
37
    last_known_block: u64,
38
    tx: EncodedTransactionWithStatusMeta,
39
) -> Vec<BonsolInstruction> {
40
    let mut res = vec![];
41

            
42
    if let Some(dtx) = tx.transaction.decode() {
43
        let scc = dtx.message.static_account_keys();
44
        if let Some(meta) = tx.meta {
45
            if meta.err.is_some() {
46
                return res;
47
            }
48
            for ix in dtx.message.instructions().iter() {
49
                if ix.program_id(scc) == program {
50
                    res.push(BonsolInstruction {
51
                        cpi: false,
52
                        accounts: ix.accounts.iter().map(|a| scc[*a as usize]).collect(),
53
                        data: ix.data.clone(),
54
                        last_known_block,
55
                    });
56
                }
57
            }
58
            let o_ix_groups: Option<Vec<UiInnerInstructions>> = meta.inner_instructions.into();
59
            if let Some(inner_ix_groups) = o_ix_groups {
60
                for group in inner_ix_groups {
61
                    for ix in group.instructions {
62
                        if let UiInstruction::Compiled(instruction) = ix {
63
                            if &scc[instruction.program_id_index as usize] == program {
64
                                let data = bs58::decode(&instruction.data).into_vec();
65
                                if let Ok(data) = data {
66
                                    res.push(BonsolInstruction {
67
                                        cpi: true,
68
                                        accounts: instruction
69
                                            .accounts
70
                                            .iter()
71
                                            .map(|a| scc[*a as usize])
72
                                            .collect(),
73
                                        data,
74
                                        last_known_block,
75
                                    });
76
                                } else {
77
                                    error!("Failed to decode bs58 data for bonsol instruction");
78
                                }
79
                            }
80
                        }
81
                    }
82
                }
83
            }
84
        }
85
    }
86
    res
87
}
88

            
89
async fn ingest(
90
    rpc_url: String,
91
    program: Pubkey,
92
    txchan: UnboundedSender<Vec<BonsolInstruction>>,
93
) -> IngesterResult {
94
    let c = PubsubClient::new(&rpc_url).await.map_err(|e| IngestError {
95
        code: IngestErrorType::RpcError,
96
        message: e.to_string(),
97
    })?;
98

            
99
    let (mut stream, _unsub) = c
100
        .block_subscribe(
101
            RpcBlockSubscribeFilter::MentionsAccountOrProgram(program.to_string()),
102
            Some(RpcBlockSubscribeConfig {
103
                encoding: Some(UiTransactionEncoding::Base64),
104
                max_supported_transaction_version: Some(0),
105
                show_rewards: Some(false),
106
                commitment: Some(CommitmentConfig::confirmed()),
107
                transaction_details: Some(solana_transaction_status::TransactionDetails::Full),
108
            }),
109
        )
110
        .await
111
        .map_err(|e| IngestError {
112
            code: IngestErrorType::RpcError,
113
            message: e.to_string(),
114
        })?;
115
    while let Some(msg) = stream.next().await {
116
        if let Some(blk) = msg.value.block {
117
            if let Some(txs) = blk.transactions {
118
                let ix = txs
119
                    .into_iter()
120
                    .flat_map(|tx| {
121
                        filter_txs(&program, blk.block_height.unwrap_or(blk.parent_slot), tx)
122
                    })
123
                    .collect::<Vec<BonsolInstruction>>();
124
                txchan.send(ix).unwrap();
125
            }
126
        }
127
    }
128
    Ok(())
129
}
130

            
131
impl Ingester for RpcIngester {
132
    fn start(&mut self, program: Pubkey) -> Result<TxChannel> {
133
        let (txchan, rx) = tokio::sync::mpsc::unbounded_channel();
134
        let rpc_url = self.rpc_url.clone();
135
        self.op_handle = Some(tokio::spawn(async move {
136
            let mut retry = 10;
137
            loop {
138
                let res = ingest(rpc_url.clone(), program, txchan.clone()).await;
139
                if let Err(e) = res {
140
                    error!("Error in ingester: {:?} retrying ", e);
141
                    tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
142
                    if retry == 0 {
143
                        break;
144
                    }
145
                    retry -= 1;
146
                }
147
            }
148
        }));
149
        Ok(rx)
150
    }
151

            
152
    fn stop(&mut self) -> Result<()> {
153
        if let Some(t) = self.op_handle.as_mut() {
154
            t.abort()
155
        }
156
        Ok(())
157
    }
158
}