1
use std::{collections::HashMap, time::Duration};
2

            
3
use tracing::error;
4

            
5
use {
6
    anyhow::anyhow,
7
    solana_sdk::{message::AccountKeys, transaction::VersionedTransaction},
8
    solana_transaction_status::TransactionStatusMeta,
9
    yellowstone_grpc_proto::geyser::SubscribeUpdate,
10
};
11

            
12
use crate::types::{filter_bonsol_instructions, BonsolInstruction};
13

            
14
use {
15
    super::{Ingester, TxChannel},
16
    anyhow::Result,
17
    futures::stream::StreamExt,
18
    solana_sdk::{message::VersionedMessage, pubkey::Pubkey},
19
    tokio::sync::mpsc::UnboundedSender,
20
    yellowstone_grpc_client::{GeyserGrpcBuilder, GeyserGrpcClient},
21
    yellowstone_grpc_proto::{
22
        convert_from::create_tx_with_meta,
23
        prelude::{
24
            subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestFilterTransactions,
25
        },
26
    },
27
};
28

            
29
pub struct GrpcIngester {
30
    url: String,
31
    token: String,
32
    connection_timeout_secs: Option<u32>,
33
    timeout_secs: Option<u32>,
34
    op_handle: Option<tokio::task::JoinHandle<Result<()>>>,
35
}
36

            
37
impl GrpcIngester {
38
    pub const fn new(
39
        url: String,
40
        token: String,
41
        connection_timeout_secs: Option<u32>,
42
        timeout_secs: Option<u32>,
43
    ) -> Self {
44
        GrpcIngester {
45
            url,
46
            token,
47
            connection_timeout_secs,
48
            timeout_secs,
49
            op_handle: None,
50
        }
51
    }
52
    pub fn url(&self) -> &str {
53
        &self.url
54
    }
55
}
56

            
57
impl<'a> TryFrom<&'a mut GrpcIngester> for GeyserGrpcBuilder {
58
    type Error = anyhow::Error;
59
    fn try_from(value: &'a mut GrpcIngester) -> Result<Self, Self::Error> {
60
        Ok(GeyserGrpcClient::build_from_shared(value.url.clone())?
61
            .x_token(Some(value.token.clone()))?
62
            .connect_timeout(Duration::from_secs(
63
                value.connection_timeout_secs.unwrap_or(10) as u64,
64
            ))
65
            .timeout(Duration::from_secs(value.timeout_secs.unwrap_or(10) as u64)))
66
    }
67
}
68

            
69
impl Ingester for GrpcIngester {
70
    fn start(&mut self, program: Pubkey) -> Result<TxChannel> {
71
        let (txchan, rx) = tokio::sync::mpsc::unbounded_channel();
72
        let stream_client = GeyserGrpcBuilder::try_from(&mut *self)?;
73
        self.op_handle = Some(tokio::spawn(async move {
74
            ingest(program, txchan, stream_client).await
75
        }));
76
        Ok(rx)
77
    }
78

            
79
    fn stop(&mut self) -> Result<()> {
80
        if let Some(t) = self.op_handle.as_mut() {
81
            t.abort()
82
        }
83
        Ok(())
84
    }
85
}
86

            
87
async fn ingest(
88
    program: Pubkey,
89
    txchan: UnboundedSender<Vec<BonsolInstruction>>,
90
    stream_client: GeyserGrpcBuilder,
91
) -> Result<()> {
92
    let mut client = stream_client.connect().await?;
93
    let mut txmap = HashMap::new();
94
    txmap.insert(
95
        program.to_string(),
96
        SubscribeRequestFilterTransactions {
97
            vote: Some(false),
98
            failed: Some(false),
99
            account_required: vec![program.to_string()],
100
            ..Default::default()
101
        },
102
    );
103
    let (_, mut stream) = client
104
        .subscribe_with_request(Some(SubscribeRequest {
105
            transactions: txmap,
106
            ..Default::default()
107
        }))
108
        .await?;
109

            
110
    while let Some(message) = stream.next().await {
111
        match message {
112
            Ok(msg) => {
113
                if let Err(e) = handle_msg(msg, program, &txchan) {
114
                    error!("Error in stream: {e:?}")
115
                }
116
            }
117
            Err(e) => error!("Error in stream: {e:?}"),
118
        }
119
    }
120
    Ok(())
121
}
122

            
123
fn handle_msg(
124
    msg: SubscribeUpdate,
125
    program: Pubkey,
126
    txchan: &UnboundedSender<Vec<BonsolInstruction>>,
127
) -> Result<()> {
128
    if let Some(UpdateOneof::Transaction(txw)) = msg.update_oneof {
129
        txw.transaction.map(|tx| -> Result<()> {
130
            create_tx_with_meta(tx)
131
                .map(|soltxn| {
132
                    try_send_instructions(
133
                        program,
134
                        txw.slot,
135
                        soltxn.account_keys(),
136
                        soltxn.get_transaction(),
137
                        soltxn.get_status_meta(),
138
                        txchan,
139
                    )
140
                })
141
                .map_err(|e| anyhow!("error while sending instructions: {e}"))?
142
        });
143
    }
144
    Ok(())
145
}
146

            
147
2
fn try_send_instructions(
148
2
    program: Pubkey,
149
2
    last_known_block: u64,
150
2
    acc: AccountKeys,
151
2
    txndata: VersionedTransaction,
152
2
    meta: Option<TransactionStatusMeta>,
153
2
    txchan: &UnboundedSender<Vec<BonsolInstruction>>,
154
2
) -> Result<()> {
155
2
    let program_filter = |acc: &AccountKeys, program: &Pubkey, index: usize| -> bool {
156
2
        acc.get(index).is_some_and(|p| p == program)
157
2
    };
158

            
159
2
    if let VersionedMessage::V0(msg) = txndata.message {
160
1
        let mut bonsolixs: Vec<BonsolInstruction> = filter_bonsol_instructions(
161
1
            msg.instructions,
162
1
            &acc,
163
1
            &program,
164
1
            last_known_block,
165
1
            program_filter,
166
1
        )
167
1
        .collect();
168
1
        if let Some(metadata) = meta {
169
1
            if let Some(inner_ix) = metadata.inner_instructions {
170
1
                bonsolixs.extend(inner_ix.into_iter().flat_map(|ix| {
171
1
                    filter_bonsol_instructions(
172
1
                        ix.instructions,
173
1
                        &acc,
174
1
                        &program,
175
1
                        last_known_block,
176
1
                        program_filter,
177
1
                    )
178
1
                }));
179
1
            }
180
        }
181
1
        if !bonsolixs.is_empty() {
182
1
            txchan.send(bonsolixs).map_err(|e| {
183
                anyhow!(
184
                    "failed to send instructions to txn ingest channel: {:?}",
185
                    e.0
186
                )
187
1
            })?
188
        }
189
1
    }
190
2
    Ok(())
191
2
}
192

            
193
#[cfg(test)]
194
mod dragon_ingester_tests {
195
    use std::str::FromStr;
196

            
197
    use {
198
        expect_test::{expect, Expect},
199
        solana_sdk::{
200
            instruction::CompiledInstruction,
201
            message::{v0::Message, AccountKeys, Message as LegacyMessage, VersionedMessage},
202
            pubkey::Pubkey,
203
            transaction::VersionedTransaction,
204
        },
205
        solana_transaction_status::{InnerInstruction, InnerInstructions, TransactionStatusMeta},
206
    };
207

            
208
    use {super::try_send_instructions, crate::types::BonsolInstruction};
209

            
210
1
    fn check_instructions(output: &[BonsolInstruction], expect: Expect) {
211
1
        expect.assert_eq(&format!("{output:#?}"));
212
1
    }
213

            
214
2
    fn create_test_compiled_tx(
215
2
        instructions: Vec<CompiledInstruction>,
216
2
        legacy: bool,
217
2
    ) -> VersionedTransaction {
218
2
        let mut t = VersionedTransaction::default();
219
2
        t.message = if legacy {
220
1
            let mut msg = LegacyMessage::default();
221
1
            msg.instructions = instructions;
222
1
            VersionedMessage::Legacy(msg)
223
        } else {
224
1
            let mut msg = Message::default();
225
1
            msg.instructions = instructions;
226
1
            VersionedMessage::V0(msg)
227
        };
228
2
        t
229
2
    }
230

            
231
2
    fn create_test_inner_tx(instructions: Vec<InnerInstructions>) -> TransactionStatusMeta {
232
2
        let mut t = TransactionStatusMeta::default();
233
2
        t.inner_instructions = Some(instructions);
234
2
        t
235
2
    }
236

            
237
    #[tokio::test]
238
1
    async fn v1_txns_pass() {
239
1
        let (txchan, mut rx) = tokio::sync::mpsc::unbounded_channel();
240
1

            
241
1
        let txndata = create_test_compiled_tx(
242
1
            vec![CompiledInstruction::new_from_raw_parts(
243
1
                0,
244
1
                vec![0, 0, 0, 0],
245
1
                vec![0, 1],
246
1
            )],
247
1
            false,
248
1
        );
249
1
        let meta = create_test_inner_tx(vec![InnerInstructions {
250
1
            index: 0,
251
1
            instructions: vec![InnerInstruction {
252
1
                instruction: CompiledInstruction::new_from_raw_parts(
253
1
                    0,
254
1
                    vec![0, 0, 0, 0],
255
1
                    vec![0, 1],
256
1
                ),
257
1
                stack_height: None,
258
1
            }],
259
1
        }]);
260
1
        let program = Pubkey::from_str("1111111QLbz7JHiBTspS962RLKV8GndWFwiEaqKM")
261
1
            .expect("failed to create pubkey from str");
262
1
        let static_keys = vec![program];
263
1
        let acc = AccountKeys::new(&static_keys, None);
264
1

            
265
1
        try_send_instructions(program, 1, acc, txndata, Some(meta), &txchan)
266
1
            .expect("failed to send instructions");
267
1

            
268
1
        assert!(!rx.is_empty());
269
1
        let bonsol_ixs = rx
270
1
            .recv()
271
1
            .await
272
1
            .expect("expected a non-empty vector of BonsolInstructions");
273
1

            
274
1
        check_instructions(
275
1
            &bonsol_ixs,
276
1
            expect![[r#"
277
1
            [
278
1
                BonsolInstruction {
279
1
                    cpi: false,
280
1
                    accounts: [
281
1
                        1111111QLbz7JHiBTspS962RLKV8GndWFwiEaqKM,
282
1
                    ],
283
1
                    data: [
284
1
                        0,
285
1
                        0,
286
1
                        0,
287
1
                        0,
288
1
                    ],
289
1
                    last_known_block: 1,
290
1
                },
291
1
                BonsolInstruction {
292
1
                    cpi: true,
293
1
                    accounts: [
294
1
                        1111111QLbz7JHiBTspS962RLKV8GndWFwiEaqKM,
295
1
                    ],
296
1
                    data: [
297
1
                        0,
298
1
                        0,
299
1
                        0,
300
1
                        0,
301
1
                    ],
302
1
                    last_known_block: 1,
303
1
                },
304
1
            ]"#]],
305
1
        );
306
1
    }
307

            
308
    #[tokio::test]
309
1
    async fn legacy_txns_fail() {
310
1
        let (txchan, rx) = tokio::sync::mpsc::unbounded_channel();
311
1

            
312
1
        let txndata = create_test_compiled_tx(
313
1
            vec![CompiledInstruction::new_from_raw_parts(
314
1
                0,
315
1
                vec![0, 0, 0, 0],
316
1
                vec![0, 1],
317
1
            )],
318
1
            true,
319
1
        );
320
1
        let meta = create_test_inner_tx(vec![InnerInstructions {
321
1
            index: 0,
322
1
            instructions: vec![InnerInstruction {
323
1
                instruction: CompiledInstruction::new_from_raw_parts(
324
1
                    0,
325
1
                    vec![0, 0, 0, 0],
326
1
                    vec![0, 1],
327
1
                ),
328
1
                stack_height: None,
329
1
            }],
330
1
        }]);
331
1
        let program = Pubkey::new_unique();
332
1
        let static_keys = vec![program];
333
1
        let acc = AccountKeys::new(&static_keys, None);
334
1

            
335
1
        try_send_instructions(program, 1, acc, txndata, Some(meta), &txchan)
336
1
            .expect("failed to send instructions");
337
1

            
338
1
        assert!(rx.is_empty())
339
1
    }
340
}