1
use {
2
    async_trait::async_trait,
3
    bonsol_interface::{
4
        bonsol_schema::{
5
            ChannelInstruction, ChannelInstructionArgs, ChannelInstructionIxType, ClaimV1,
6
            ClaimV1Args, StatusTypes, StatusV1, StatusV1Args,
7
        },
8
        util::{deployment_address, execution_address, execution_claim_address},
9
    },
10
    dashmap::DashMap,
11
    flatbuffers::FlatBufferBuilder,
12
    solana_rpc_client_api::config::RpcSendTransactionConfig,
13
    solana_sdk::{
14
        account::Account,
15
        commitment_config::CommitmentConfig,
16
        message::{v0, VersionedMessage},
17
        signature::Signature,
18
        signer::SignerError,
19
        system_program,
20
        transaction::VersionedTransaction,
21
    },
22
    tokio::task::JoinHandle,
23
};
24

            
25
use crate::transaction_sender::status::TransactionStatus;
26

            
27
use std::sync::Arc;
28
use tracing::error;
29

            
30
use {
31
    crate::types::ProgramExec,
32
    anyhow::Result,
33
    itertools::Itertools,
34
    solana_rpc_client::nonblocking::rpc_client::RpcClient,
35
    solana_sdk::{
36
        instruction::{AccountMeta, Instruction},
37
        pubkey::Pubkey,
38
        signature::Keypair,
39
        signer::Signer,
40
    },
41
    tracing::info,
42
};
43

            
44
use crate::transaction_sender::transaction_sender::TransactionSender;
45

            
46
pub struct RpcTransactionSender {
47
    pub rpc_client: Arc<RpcClient>,
48
    pub bonsol_program: Pubkey,
49
    pub signer: Keypair,
50
    pub txn_status_handle: Option<JoinHandle<()>>,
51
    pub sigs: Arc<DashMap<Signature, TransactionStatus>>,
52
}
53

            
54
impl RpcTransactionSender {
55
    pub fn new(rpc_url: String, bonsol_program: Pubkey, signer: Keypair) -> Self {
56
        Self {
57
            rpc_client: Arc::new(RpcClient::new(rpc_url)),
58
            signer,
59
            bonsol_program,
60
            txn_status_handle: None,
61
            sigs: Arc::new(DashMap::new()),
62
        }
63
    }
64
}
65

            
66
impl Signer for RpcTransactionSender {
67
    fn pubkey(&self) -> Pubkey {
68
        self.signer.pubkey()
69
    }
70

            
71
    fn try_pubkey(&self) -> Result<Pubkey, SignerError> {
72
        Ok(self.signer.pubkey())
73
    }
74

            
75
    fn sign_message(&self, message: &[u8]) -> Signature {
76
        self.signer.sign_message(message)
77
    }
78

            
79
    fn try_sign_message(
80
        &self,
81
        message: &[u8],
82
    ) -> std::result::Result<Signature, solana_sdk::signer::SignerError> {
83
        self.signer.try_sign_message(message)
84
    }
85

            
86
    fn is_interactive(&self) -> bool {
87
        false
88
    }
89
}
90

            
91
#[async_trait]
92
impl TransactionSender for RpcTransactionSender {
93
    fn get_signature_status(&self, sig: &Signature) -> std::option::Option<TransactionStatus> {
94
        self.sigs.get(sig).map(|status| status.value().to_owned())
95
    }
96

            
97
    fn clear_signature_status(&self, sig: &Signature) {
98
        self.sigs.remove(sig);
99
    }
100

            
101
    async fn claim(
102
        &self,
103
        execution_id: &str,
104
        requester: Pubkey,
105
        execution_account: Pubkey,
106
        block_commitment: u64,
107
    ) -> Result<Signature> {
108
        let (execution_claim_account, _) = execution_claim_address(execution_account.as_ref());
109
        let accounts = vec![
110
            AccountMeta::new(execution_account, false),
111
            AccountMeta::new_readonly(requester, false),
112
            AccountMeta::new(execution_claim_account, false),
113
            AccountMeta::new(self.signer.pubkey(), true),
114
            AccountMeta::new(self.signer.pubkey(), true),
115
            AccountMeta::new_readonly(system_program::id(), false),
116
        ];
117
        let mut fbb = FlatBufferBuilder::new();
118
        let eid = fbb.create_string(execution_id);
119
        let stat = ClaimV1::create(
120
            &mut fbb,
121
            &ClaimV1Args {
122
                block_commitment,
123
                execution_id: Some(eid),
124
            },
125
        );
126
        fbb.finish(stat, None);
127
        let statbytes = fbb.finished_data();
128
        let mut fbb2 = FlatBufferBuilder::new();
129
        let off = fbb2.create_vector(statbytes);
130
        let root = ChannelInstruction::create(
131
            &mut fbb2,
132
            &ChannelInstructionArgs {
133
                ix_type: ChannelInstructionIxType::ClaimV1,
134
                claim_v1: Some(off),
135
                ..Default::default()
136
            },
137
        );
138
        fbb2.finish(root, None);
139
        let ix_data = fbb2.finished_data();
140
        let instruction = Instruction::new_with_bytes(self.bonsol_program, ix_data, accounts);
141
        let (blockhash_req, last_valid) = self
142
            .rpc_client
143
            .get_latest_blockhash_with_commitment(self.rpc_client.commitment())
144
            .await
145
            .map_err(|e| anyhow::anyhow!("Failed to get blockhash: {:?}", e))?;
146

            
147
        let msg =
148
            v0::Message::try_compile(&self.signer.pubkey(), &[instruction], &[], blockhash_req)?;
149
        let tx = VersionedTransaction::try_new(VersionedMessage::V0(msg), &[&self.signer])?;
150
        let sig = self
151
            .rpc_client
152
            .send_transaction_with_config(
153
                &tx,
154
                RpcSendTransactionConfig {
155
                    skip_preflight: true,
156
                    ..Default::default()
157
                },
158
            )
159
            .await
160
            .map_err(|e| anyhow::anyhow!("Failed to send transaction: {:?}", e))?;
161
        self.sigs
162
            .insert(sig, TransactionStatus::Pending { expiry: last_valid });
163
        Ok(sig)
164
    }
165

            
166
    async fn submit_proof(
167
        &self,
168
        execution_id: &str,
169
        requester_account: Pubkey,
170
        callback_exec: Option<ProgramExec>,
171
        proof: &[u8],
172
        execution_digest: &[u8],
173
        input_digest: &[u8],
174
        assumption_digest: &[u8],
175
        committed_outputs: &[u8],
176
        additional_accounts: Vec<AccountMeta>,
177
        exit_code_system: u32,
178
        exit_code_user: u32,
179
    ) -> Result<Signature> {
180
        let (execution_request_data_account, _) =
181
            execution_address(&requester_account, execution_id.as_bytes());
182
        let (id, additional_accounts) = match callback_exec {
183
            None => (self.bonsol_program, vec![]),
184
            Some(pe) => {
185
                let prog = pe.program_id;
186
                //todo: add read interface simulation on program to get other accounts
187
                (prog, additional_accounts)
188
            }
189
        };
190

            
191
        let mut accounts = vec![
192
            AccountMeta::new(requester_account, false),
193
            AccountMeta::new(execution_request_data_account, false),
194
            AccountMeta::new_readonly(id, false),
195
            AccountMeta::new(self.signer.pubkey(), true),
196
        ];
197
        accounts.extend(additional_accounts);
198
        let mut fbb = FlatBufferBuilder::new();
199
        let proof_vec = fbb.create_vector(proof);
200
        let execution_digest = fbb.create_vector(execution_digest);
201
        let input_digest = fbb.create_vector(input_digest);
202
        let assumption_digest = fbb.create_vector(assumption_digest);
203
        let eid = fbb.create_string(execution_id);
204
        let out = fbb.create_vector(committed_outputs);
205
        let stat = StatusV1::create(
206
            &mut fbb,
207
            &StatusV1Args {
208
                execution_id: Some(eid),                    //0-?? bytes lets say 16
209
                status: StatusTypes::Completed,             //1 byte
210
                proof: Some(proof_vec),                     //256 bytes
211
                execution_digest: Some(execution_digest),   //32 bytes
212
                input_digest: Some(input_digest),           //32 bytes
213
                assumption_digest: Some(assumption_digest), //32 bytes
214
                committed_outputs: Some(out),               //0-?? bytes lets say 32
215
                exit_code_system,                           //4 byte
216
                exit_code_user,                             //4 byte
217
            }, //total ~408 bytes plenty of room for more stuff
218
        );
219
        fbb.finish(stat, None);
220
        let statbytes = fbb.finished_data();
221
        let mut fbb2 = FlatBufferBuilder::new();
222
        let off = fbb2.create_vector(statbytes);
223
        let root = ChannelInstruction::create(
224
            &mut fbb2,
225
            &ChannelInstructionArgs {
226
                ix_type: ChannelInstructionIxType::StatusV1,
227
                status_v1: Some(off),
228
                ..Default::default()
229
            },
230
        );
231
        fbb2.finish(root, None);
232
        let ix_data = fbb2.finished_data();
233
        let instruction = Instruction::new_with_bytes(self.bonsol_program, ix_data, accounts);
234
        let (blockhash, last_valid) = self
235
            .rpc_client
236
            .get_latest_blockhash_with_commitment(self.rpc_client.commitment())
237
            .await
238
            .map_err(|e| anyhow::anyhow!("Failed to get blockhash: {:?}", e))?;
239

            
240
        let msg = v0::Message::try_compile(&self.signer.pubkey(), &[instruction], &[], blockhash)?;
241
        let tx = VersionedTransaction::try_new(VersionedMessage::V0(msg), &[&self.signer])?;
242

            
243
        let sig = self
244
            .rpc_client
245
            .send_and_confirm_transaction_with_spinner_and_config(
246
                &tx,
247
                CommitmentConfig::confirmed(),
248
                RpcSendTransactionConfig {
249
                    skip_preflight: true,
250
                    ..Default::default()
251
                },
252
            )
253
            .await
254
            .map_err(|e| anyhow::anyhow!("Failed to send transaction: {:?}", e))?;
255
        self.sigs
256
            .insert(sig, TransactionStatus::Pending { expiry: last_valid });
257
        Ok(sig)
258
    }
259

            
260
    fn start(&mut self) {
261
        let sigs_ref = self.sigs.clone();
262
        let rpc_client = self.rpc_client.clone();
263
        self.txn_status_handle = Some(tokio::spawn(async move {
264
            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
265
            loop {
266
                interval.tick().await;
267
                let current_block_height = rpc_client
268
                    .get_block_height_with_commitment(rpc_client.commitment())
269
                    .await;
270

            
271
                if let Ok(current_block_height) = current_block_height {
272
                    sigs_ref.retain(|k, v| {
273
                        if let TransactionStatus::Pending { expiry } = v {
274
                            if *expiry < current_block_height {
275
                                info!("Transaction expired {}", k);
276
                                return false;
277
                            }
278
                        }
279
                        true
280
                    });
281
                    let all_sigs = sigs_ref.iter().map(|x| *x.key()).collect_vec();
282
                    let statuses = rpc_client.get_signature_statuses(&all_sigs).await;
283
                    if let Ok(statuses) = statuses {
284
                        for sig in all_sigs.into_iter().zip(statuses.value.into_iter()) {
285
                            if let Some(status) = sig.1 {
286
                                sigs_ref.insert(sig.0, TransactionStatus::Confirmed(status));
287
                            }
288
                        }
289
                    }
290
                } else {
291
                    error!("Failed to get block height");
292
                }
293
            }
294
        }));
295
    }
296

            
297
    async fn get_current_block(&self) -> Result<u64> {
298
        self.rpc_client
299
            .get_block_height()
300
            .await
301
            .map_err(|e| anyhow::anyhow!("{:?}", e))
302
    }
303

            
304
    async fn get_deployment_account(&self, image_id: &str) -> Result<Account> {
305
        let (deployment_account, _) = deployment_address(image_id);
306
        self.rpc_client
307
            .get_account(&deployment_account)
308
            .await
309
            .map_err(|e| anyhow::anyhow!("Failed to get account: {:?}", e))
310
    }
311
}