1
use std::sync::Arc;
2

            
3
use tracing::error;
4

            
5
use {
6
    async_trait::async_trait,
7
    bonsol_interface::{
8
        bonsol_schema::{
9
            ChannelInstruction, ChannelInstructionArgs, ChannelInstructionIxType, ClaimV1,
10
            ClaimV1Args, StatusTypes, StatusV1, StatusV1Args,
11
        },
12
        util::{deployment_address, execution_address, execution_claim_address},
13
    },
14
    dashmap::DashMap,
15
    flatbuffers::FlatBufferBuilder,
16
    itertools::Itertools,
17
    solana_rpc_client_api::config::RpcSendTransactionConfig,
18
    solana_sdk::{
19
        account::Account,
20
        commitment_config::CommitmentConfig,
21
        message::{v0, VersionedMessage},
22
        signature::Signature,
23
        signer::SignerError,
24
        system_program,
25
        transaction::VersionedTransaction,
26
    },
27
    solana_transaction_status::TransactionStatus as TransactionConfirmationStatus,
28
    tokio::task::JoinHandle,
29
};
30

            
31
use {
32
    crate::types::ProgramExec,
33
    anyhow::Result,
34
    solana_rpc_client::nonblocking::rpc_client::RpcClient,
35
    solana_sdk::{
36
        instruction::{AccountMeta, Instruction},
37
        pubkey::Pubkey,
38
        signature::Keypair,
39
        signer::Signer,
40
    },
41
    tracing::info,
42
};
43

            
44
#[derive(Debug, Clone, PartialEq)]
45
pub enum TransactionStatus {
46
    Pending { expiry: u64 },
47
    Confirmed(TransactionConfirmationStatus),
48
}
49

            
50
#[async_trait]
51
pub trait TransactionSender {
52
    fn start(&mut self);
53
    async fn claim(
54
        &self,
55
        execution_id: &str,
56
        requester: Pubkey,
57
        execution_account: Pubkey,
58
        block_commitment: u64,
59
    ) -> Result<Signature>;
60
    async fn submit_proof(
61
        &self,
62
        execution_id: &str,
63
        requester_account: Pubkey,
64
        callback_exec: Option<ProgramExec>,
65
        proof: &[u8],
66
        execution_digest: &[u8],
67
        input_digest: &[u8],
68
        assumption_digest: &[u8],
69
        committed_outputs: &[u8],
70
        additional_accounts: Vec<AccountMeta>,
71
        exit_code_system: u32,
72
        exit_code_user: u32,
73
    ) -> Result<Signature>;
74
    async fn get_current_block(&self) -> Result<u64>;
75
    fn get_signature_status(&self, sig: &Signature) -> Option<TransactionStatus>;
76
    fn clear_signature_status(&self, sig: &Signature);
77
    async fn get_deployment_account(&self, image_id: &str) -> Result<Account>;
78
}
79

            
80
pub struct RpcTransactionSender {
81
    pub rpc_client: Arc<RpcClient>,
82
    pub bonsol_program: Pubkey,
83
    pub signer: Keypair,
84
    pub txn_status_handle: Option<JoinHandle<()>>,
85
    pub sigs: Arc<DashMap<Signature, TransactionStatus>>,
86
}
87

            
88
impl Signer for RpcTransactionSender {
89
    fn pubkey(&self) -> Pubkey {
90
        self.signer.pubkey()
91
    }
92

            
93
    fn try_pubkey(&self) -> Result<Pubkey, SignerError> {
94
        Ok(self.signer.pubkey())
95
    }
96

            
97
    fn sign_message(&self, message: &[u8]) -> Signature {
98
        self.signer.sign_message(message)
99
    }
100

            
101
    fn try_sign_message(
102
        &self,
103
        message: &[u8],
104
    ) -> std::result::Result<Signature, solana_sdk::signer::SignerError> {
105
        self.signer.try_sign_message(message)
106
    }
107

            
108
    fn is_interactive(&self) -> bool {
109
        false
110
    }
111
}
112

            
113
impl RpcTransactionSender {
114
    pub fn new(rpc_url: String, bonsol_program: Pubkey, signer: Keypair) -> Self {
115
        Self {
116
            rpc_client: Arc::new(RpcClient::new(rpc_url)),
117
            signer,
118
            bonsol_program,
119
            txn_status_handle: None,
120
            sigs: Arc::new(DashMap::new()),
121
        }
122
    }
123
}
124

            
125
#[async_trait]
126
impl TransactionSender for RpcTransactionSender {
127
    fn get_signature_status(&self, sig: &Signature) -> Option<TransactionStatus> {
128
        self.sigs.get(sig).map(|status| status.value().to_owned())
129
    }
130

            
131
    fn clear_signature_status(&self, sig: &Signature) {
132
        self.sigs.remove(sig);
133
    }
134

            
135
    async fn claim(
136
        &self,
137
        execution_id: &str,
138
        requester: Pubkey,
139
        execution_account: Pubkey,
140
        block_commitment: u64,
141
    ) -> Result<Signature> {
142
        let (execution_claim_account, _) = execution_claim_address(execution_account.as_ref());
143
        let accounts = vec![
144
            AccountMeta::new(execution_account, false),
145
            AccountMeta::new_readonly(requester, false),
146
            AccountMeta::new(execution_claim_account, false),
147
            AccountMeta::new(self.signer.pubkey(), true),
148
            AccountMeta::new(self.signer.pubkey(), true),
149
            AccountMeta::new_readonly(system_program::id(), false),
150
        ];
151
        let mut fbb = FlatBufferBuilder::new();
152
        let eid = fbb.create_string(execution_id);
153
        let stat = ClaimV1::create(
154
            &mut fbb,
155
            &ClaimV1Args {
156
                block_commitment,
157
                execution_id: Some(eid),
158
            },
159
        );
160
        fbb.finish(stat, None);
161
        let statbytes = fbb.finished_data();
162
        let mut fbb2 = FlatBufferBuilder::new();
163
        let off = fbb2.create_vector(statbytes);
164
        let root = ChannelInstruction::create(
165
            &mut fbb2,
166
            &ChannelInstructionArgs {
167
                ix_type: ChannelInstructionIxType::ClaimV1,
168
                claim_v1: Some(off),
169
                ..Default::default()
170
            },
171
        );
172
        fbb2.finish(root, None);
173
        let ix_data = fbb2.finished_data();
174
        let instruction = Instruction::new_with_bytes(self.bonsol_program, ix_data, accounts);
175
        let (blockhash_req, last_valid) = self
176
            .rpc_client
177
            .get_latest_blockhash_with_commitment(self.rpc_client.commitment())
178
            .await
179
            .map_err(|e| anyhow::anyhow!("Failed to get blockhash: {:?}", e))?;
180

            
181
        let msg =
182
            v0::Message::try_compile(&self.signer.pubkey(), &[instruction], &[], blockhash_req)?;
183
        let tx = VersionedTransaction::try_new(VersionedMessage::V0(msg), &[&self.signer])?;
184
        let sig = self
185
            .rpc_client
186
            .send_transaction_with_config(
187
                &tx,
188
                RpcSendTransactionConfig {
189
                    skip_preflight: true,
190
                    ..Default::default()
191
                },
192
            )
193
            .await
194
            .map_err(|e| anyhow::anyhow!("Failed to send transaction: {:?}", e))?;
195
        self.sigs
196
            .insert(sig, TransactionStatus::Pending { expiry: last_valid });
197
        Ok(sig)
198
    }
199

            
200
    async fn submit_proof(
201
        &self,
202
        execution_id: &str,
203
        requester_account: Pubkey,
204
        callback_exec: Option<ProgramExec>,
205
        proof: &[u8],
206
        execution_digest: &[u8],
207
        input_digest: &[u8],
208
        assumption_digest: &[u8],
209
        committed_outputs: &[u8],
210
        additional_accounts: Vec<AccountMeta>,
211
        exit_code_system: u32,
212
        exit_code_user: u32,
213
    ) -> Result<Signature> {
214
        let (execution_request_data_account, _) =
215
            execution_address(&requester_account, execution_id.as_bytes());
216
        let (id, additional_accounts) = match callback_exec {
217
            None => (self.bonsol_program, vec![]),
218
            Some(pe) => {
219
                let prog = pe.program_id;
220
                //todo: add read interface simulation on program to get other accounts
221
                (prog, additional_accounts)
222
            }
223
        };
224

            
225
        let mut accounts = vec![
226
            AccountMeta::new(requester_account, false),
227
            AccountMeta::new(execution_request_data_account, false),
228
            AccountMeta::new_readonly(id, false),
229
            AccountMeta::new(self.signer.pubkey(), true),
230
        ];
231
        accounts.extend(additional_accounts);
232
        let mut fbb = FlatBufferBuilder::new();
233
        let proof_vec = fbb.create_vector(proof);
234
        let execution_digest = fbb.create_vector(execution_digest);
235
        let input_digest = fbb.create_vector(input_digest);
236
        let assumption_digest = fbb.create_vector(assumption_digest);
237
        let eid = fbb.create_string(execution_id);
238
        let out = fbb.create_vector(committed_outputs);
239
        let stat = StatusV1::create(
240
            &mut fbb,
241
            &StatusV1Args {
242
                execution_id: Some(eid),                    //0-?? bytes lets say 16
243
                status: StatusTypes::Completed,             //1 byte
244
                proof: Some(proof_vec),                     //256 bytes
245
                execution_digest: Some(execution_digest),   //32 bytes
246
                input_digest: Some(input_digest),           //32 bytes
247
                assumption_digest: Some(assumption_digest), //32 bytes
248
                committed_outputs: Some(out),               //0-?? bytes lets say 32
249
                exit_code_system,                           //4 byte
250
                exit_code_user,                             //4 byte
251
            }, //total ~408 bytes plenty of room for more stuff
252
        );
253
        fbb.finish(stat, None);
254
        let statbytes = fbb.finished_data();
255
        let mut fbb2 = FlatBufferBuilder::new();
256
        let off = fbb2.create_vector(statbytes);
257
        let root = ChannelInstruction::create(
258
            &mut fbb2,
259
            &ChannelInstructionArgs {
260
                ix_type: ChannelInstructionIxType::StatusV1,
261
                status_v1: Some(off),
262
                ..Default::default()
263
            },
264
        );
265
        fbb2.finish(root, None);
266
        let ix_data = fbb2.finished_data();
267
        let instruction = Instruction::new_with_bytes(self.bonsol_program, ix_data, accounts);
268
        let (blockhash, last_valid) = self
269
            .rpc_client
270
            .get_latest_blockhash_with_commitment(self.rpc_client.commitment())
271
            .await
272
            .map_err(|e| anyhow::anyhow!("Failed to get blockhash: {:?}", e))?;
273

            
274
        let msg = v0::Message::try_compile(&self.signer.pubkey(), &[instruction], &[], blockhash)?;
275
        let tx = VersionedTransaction::try_new(VersionedMessage::V0(msg), &[&self.signer])?;
276

            
277
        let sig = self
278
            .rpc_client
279
            .send_and_confirm_transaction_with_spinner_and_config(
280
                &tx,
281
                CommitmentConfig::confirmed(),
282
                RpcSendTransactionConfig {
283
                    skip_preflight: true,
284
                    ..Default::default()
285
                },
286
            )
287
            .await
288
            .map_err(|e| anyhow::anyhow!("Failed to send transaction: {:?}", e))?;
289
        self.sigs
290
            .insert(sig, TransactionStatus::Pending { expiry: last_valid });
291
        Ok(sig)
292
    }
293

            
294
    fn start(&mut self) {
295
        let sigs_ref = self.sigs.clone();
296
        let rpc_client = self.rpc_client.clone();
297
        self.txn_status_handle = Some(tokio::spawn(async move {
298
            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
299
            loop {
300
                interval.tick().await;
301
                let current_block_height = rpc_client
302
                    .get_block_height_with_commitment(rpc_client.commitment())
303
                    .await;
304

            
305
                if let Ok(current_block_height) = current_block_height {
306
                    sigs_ref.retain(|k, v| {
307
                        if let TransactionStatus::Pending { expiry } = v {
308
                            if *expiry < current_block_height {
309
                                info!("Transaction expired {}", k);
310
                                return false;
311
                            }
312
                        }
313
                        true
314
                    });
315
                    let all_sigs = sigs_ref.iter().map(|x| *x.key()).collect_vec();
316
                    let statuses = rpc_client.get_signature_statuses(&all_sigs).await;
317
                    if let Ok(statuses) = statuses {
318
                        for sig in all_sigs.into_iter().zip(statuses.value.into_iter()) {
319
                            if let Some(status) = sig.1 {
320
                                sigs_ref.insert(sig.0, TransactionStatus::Confirmed(status));
321
                            }
322
                        }
323
                    }
324
                } else {
325
                    error!("Failed to get block height");
326
                }
327
            }
328
        }));
329
    }
330

            
331
    async fn get_current_block(&self) -> Result<u64> {
332
        self.rpc_client
333
            .get_block_height()
334
            .await
335
            .map_err(|e| anyhow::anyhow!("{:?}", e))
336
    }
337

            
338
    async fn get_deployment_account(&self, image_id: &str) -> Result<Account> {
339
        let (deployment_account, _) = deployment_address(image_id);
340
        self.rpc_client
341
            .get_account(&deployment_account)
342
            .await
343
            .map_err(|e| anyhow::anyhow!("Failed to get account: {:?}", e))
344
    }
345
}