1
mod utils;
2
pub mod verify_prover_version;
3

            
4
use crate::transaction_sender::TransactionStatus;
5

            
6
use {
7
    solana_sdk::instruction::AccountMeta,
8
    utils::{check_stark_compression_tools_path, check_x86_64arch},
9
};
10

            
11
use {
12
    crate::{
13
        config::ProverNodeConfig,
14
        observe::*,
15
        risc0_runner::utils::async_to_json,
16
        transaction_sender::{RpcTransactionSender, TransactionSender},
17
        MissingImageStrategy,
18
    },
19
    bonsol_interface::{
20
        bonsol_schema::{ClaimV1, DeployV1, ExecutionRequestV1},
21
        prover_version::{ProverVersion, VERSION_V1_2_1},
22
    },
23
    dashmap::DashMap,
24
    risc0_binfmt::MemoryImage,
25
    risc0_zkvm::{ExitCode, Journal, SuccinctReceipt},
26
    solana_sdk::{pubkey::Pubkey, signature::Signature},
27
    std::{
28
        convert::TryInto, env::consts::ARCH, fs, io::Cursor, path::Path, sync::Arc, time::Duration,
29
    },
30
};
31

            
32
use {
33
    crate::types::{BonsolInstruction, ProgramExec},
34
    anyhow::Result,
35
    bonsol_interface::bonsol_schema::{parse_ix_data, root_as_deploy_v1, ChannelInstructionIxType},
36
    bonsol_prover::{
37
        image::Image,
38
        input_resolver::{InputResolver, ProgramInput},
39
        prover::{get_risc0_prover, new_risc0_exec_env},
40
        util::get_body_max_size,
41
    },
42
    risc0_groth16::{ProofJson, Seal},
43
    risc0_zkvm::{
44
        recursion::identity_p254,
45
        sha::{Digest, Digestible},
46
        InnerReceipt, MaybePruned, ReceiptClaim, VerifierContext,
47
    },
48
    tempfile::tempdir,
49
    thiserror::Error,
50
    tokio::{
51
        fs::File, io::AsyncReadExt, process::Command, sync::mpsc::UnboundedSender, task::JoinHandle,
52
    },
53
    tracing::{error, info, warn},
54
    verify_prover_version::verify_prover_version,
55
};
56

            
57
const REQUIRED_PROVER: ProverVersion = VERSION_V1_2_1;
58

            
59
#[derive(Debug, Error)]
60
pub enum Risc0RunnerError {
61
    #[error("Empty instruction")]
62
    EmptyInstruction,
63
    #[error("Invalid data")]
64
    InvalidData,
65
    #[error("Img too large")]
66
    ImgTooLarge,
67
    #[error("Img load error")]
68
    ImgLoadError,
69
    #[error("Image Data Unavailable")]
70
    ImageDataUnavailable,
71
    #[error("Image download error")]
72
    ImageDownloadError(#[from] anyhow::Error),
73
    #[error("Transaction error")]
74
    TransactionError(String),
75
    #[error("Error with proof compression")]
76
    ProofCompressionError,
77
    #[error("Error with proof generation")]
78
    ProofGenerationError,
79
    #[error("Invalid prover version {0}, expected {1}")]
80
    InvalidProverVersion(ProverVersion, ProverVersion),
81
}
82

            
83
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
84
pub enum ClaimStatus {
85
    Claiming,
86
    Submitted,
87
}
88

            
89
#[derive(Clone, Debug, PartialEq, Eq)]
90
pub struct InflightProof {
91
    pub execution_id: String,
92
    pub image_id: String,
93
    pub status: ClaimStatus,
94
    pub claim_signature: Signature,
95
    pub submission_signature: Option<Signature>,
96
    pub expiry: u64,
97
    pub requester: Pubkey,
98
    pub program_callback: Option<ProgramExec>,
99
    pub additional_accounts: Vec<AccountMeta>,
100
}
101

            
102
type InflightProofs = Arc<DashMap<String, InflightProof>>;
103
type InflightProofRef<'a> = &'a DashMap<String, InflightProof>;
104

            
105
type LoadedImageMap = Arc<DashMap<String, Image>>;
106
type LoadedImageMapRef<'a> = &'a DashMap<String, Image>;
107

            
108
type InputStagingArea = Arc<DashMap<String, Vec<ProgramInput>>>;
109
type InputStagingAreaRef<'a> = &'a DashMap<String, Vec<ProgramInput>>;
110

            
111
pub struct Risc0Runner {
112
    config: Arc<ProverNodeConfig>,
113
    loaded_images: LoadedImageMap,
114
    worker_handle: Option<JoinHandle<Result<()>>>,
115
    inflight_proof_worker_handle: Option<JoinHandle<Result<()>>>,
116
    txn_sender: Arc<RpcTransactionSender>,
117
    input_staging_area: InputStagingArea,
118
    self_identity: Arc<Pubkey>,
119
    inflight_proofs: InflightProofs,
120
    input_resolver: Arc<dyn InputResolver + 'static>,
121
}
122

            
123
impl Risc0Runner {
124
    pub async fn new(
125
        config: ProverNodeConfig,
126
        self_identity: Pubkey,
127
        txn_sender: Arc<RpcTransactionSender>,
128
        input_resolver: Arc<dyn InputResolver + 'static>,
129
    ) -> Result<Risc0Runner> {
130
        if !check_x86_64arch() {
131
            warn!("Bonsol node will not compress STARKs to SNARKs after successful risc0vm\nproving due to stark compression tooling requiring x86_64 architectures - virtualization will also fail");
132
        }
133

            
134
        check_stark_compression_tools_path(&config.stark_compression_tools_path)?;
135

            
136
        if !std::path::Path::new(&config.risc0_image_folder).exists() {
137
            fs::create_dir_all(&config.risc0_image_folder)?;
138
        }
139

            
140
        let dir = fs::read_dir(&config.risc0_image_folder)?;
141
        let loaded_images = DashMap::new();
142
        for entry in dir {
143
            let entry = entry?;
144
            if entry.file_type()?.is_file() {
145
                let img = Image::new(entry.path()).await?;
146
                info!("Loaded image: {}", &img.id);
147
                loaded_images.insert(img.id.clone(), img);
148
            }
149
        }
150

            
151
        Ok(Risc0Runner {
152
            config: Arc::new(config),
153
            loaded_images: Arc::new(loaded_images),
154
            worker_handle: None,
155
            inflight_proof_worker_handle: None,
156
            txn_sender,
157
            input_staging_area: Arc::new(DashMap::new()),
158
            self_identity: Arc::new(self_identity),
159
            inflight_proofs: Arc::new(DashMap::new()),
160
            input_resolver,
161
        })
162
    }
163

            
164
    // TODO: break up pipeline into smaller domains to make it easier to test
165
    // Break into Image handling, Input handling, Execution Request
166
    // Inputs and Image should be service used by this prover.
167
    pub fn start(&mut self) -> Result<UnboundedSender<BonsolInstruction>> {
168
        verify_prover_version(REQUIRED_PROVER)
169
            .expect("Bonsol build conflict: prover version is not supported");
170
        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<BonsolInstruction>();
171
        let loaded_images = self.loaded_images.clone();
172
        // TODO: move image handling out of prover
173
        let img_client = Arc::new(
174
            reqwest::Client::builder()
175
                .timeout(Duration::from_secs(
176
                    self.config.image_download_timeout_secs as u64,
177
                ))
178
                .build()?,
179
        );
180
        let config = self.config.clone();
181
        let self_id = self.self_identity.clone();
182
        let input_staging_area = self.input_staging_area.clone();
183
        let inflight_proofs = self.inflight_proofs.clone();
184
        let txn_sender = self.txn_sender.clone();
185
        self.inflight_proof_worker_handle = Some(tokio::spawn(async move {
186
            let mut interval = tokio::time::interval(Duration::from_secs(1));
187
            loop {
188
                interval.tick().await;
189
                let current_block = txn_sender.get_current_block().await.unwrap_or(0);
190
                inflight_proofs.retain(|_, v| {
191
                    if v.expiry < current_block {
192
                        emit_event!(MetricEvents::ProofExpired, execution_id => v.execution_id.clone());
193
                        return false;
194
                    }
195
                    match &v.status {
196
                        ClaimStatus::Claiming => {
197
                            let sig = v.claim_signature;
198
                            let inner_status = txn_sender.get_signature_status(&sig);
199
                            return match inner_status {
200
                                None => false,
201
                                Some(status) => {
202
                                    match status {
203
                                        TransactionStatus::Confirmed(status) => {
204
                                            txn_sender.clear_signature_status(&sig);
205
                                            if status.err.is_some() {
206
                                                info!("Claim Transaction Failed");
207

            
208
                                            }
209
                                            status.err.is_none()
210
                                        },
211
                                        _ => true
212
                                    }
213
                                }
214
                            };
215
                        }
216
                        ClaimStatus::Submitted => {
217
                            if let Some(sig) = v.submission_signature.as_ref() {
218
                                let inner_status = txn_sender.get_signature_status(sig);
219
                                return match inner_status {
220
                                    None => false,
221
                                    Some(status) => {
222
                                        match status {
223
                                            TransactionStatus::Confirmed(status) => {
224
                                                txn_sender.clear_signature_status(sig);
225
                                                if status.err.is_some() {
226
                                                    emit_event!(MetricEvents::ProofSubmissionError, sig => sig.to_string());
227
                                                }
228
                                                status.err.is_none()
229
                                            },
230
                                            _ => true
231
                                        }
232
                                    }
233
                                };
234
                            }
235
                        }
236
                    };
237
                    true
238
                });
239
            }
240
        }));
241

            
242
        let inflight_proofs = self.inflight_proofs.clone();
243
        let txn_sender = self.txn_sender.clone();
244
        let input_resolver = self.input_resolver.clone();
245
        self.worker_handle = Some(tokio::spawn(async move {
246
            while let Some(bix) = rx.recv().await {
247
                let txn_sender = txn_sender.clone();
248
                let loaded_images = loaded_images.clone();
249
                let config = config.clone();
250
                let img_client = img_client.clone();
251
                let input_resolver = input_resolver.clone();
252
                let self_id = self_id.clone();
253
                let input_staging_area = input_staging_area.clone();
254
                let inflight_proofs = inflight_proofs.clone();
255
                tokio::spawn(async move {
256
                    let bonsol_ix_type =
257
                        parse_ix_data(&bix.data).map_err(|_| Risc0RunnerError::InvalidData)?;
258
                    let result = match bonsol_ix_type.ix_type() {
259
                        ChannelInstructionIxType::DeployV1 => {
260
                            let payload = bonsol_ix_type
261
                                .deploy_v1_nested_flatbuffer()
262
                                .ok_or::<anyhow::Error>(
263
                                Risc0RunnerError::EmptyInstruction.into(),
264
                            )?;
265
                            emit_counter!(MetricEvents::ImageDeployment, 1, "image_id" => payload.image_id().unwrap_or_default());
266
                            handle_image_deployment(&config, &img_client, payload, &loaded_images)
267
                                .await
268
                        }
269
                        ChannelInstructionIxType::ExecuteV1 => {
270
                            info!("Received execution request");
271
                            // Evaluate the execution request and decide if it should be claimed
272
                            let payload = bonsol_ix_type
273
                                .execute_v1_nested_flatbuffer()
274
                                .ok_or::<anyhow::Error>(
275
                                Risc0RunnerError::EmptyInstruction.into(),
276
                            )?;
277
                            let er_prover_version: ProverVersion = payload
278
                                .prover_version()
279
                                .try_into()
280
                                .map_err::<anyhow::Error, _>(|_| {
281
                                    Risc0RunnerError::InvalidProverVersion(
282
                                        ProverVersion::UnsupportedVersion,
283
                                        REQUIRED_PROVER,
284
                                    )
285
                                    .into()
286
                                })?;
287
                            if er_prover_version != REQUIRED_PROVER {
288
                                return Err(Risc0RunnerError::InvalidProverVersion(
289
                                    er_prover_version,
290
                                    REQUIRED_PROVER,
291
                                )
292
                                .into());
293
                            }
294
                            handle_execution_request(
295
                                &config,
296
                                &inflight_proofs,
297
                                input_resolver.clone(),
298
                                img_client.clone(),
299
                                &txn_sender,
300
                                &loaded_images,
301
                                &input_staging_area,
302
                                bix.last_known_block,
303
                                payload,
304
                                &bix.accounts,
305
                            )
306
                            .await
307
                        }
308
                        ChannelInstructionIxType::ClaimV1 => {
309
                            info!("Claim Event");
310
                            let payload = bonsol_ix_type
311
                                .claim_v1_nested_flatbuffer()
312
                                .ok_or::<anyhow::Error>(
313
                                Risc0RunnerError::EmptyInstruction.into(),
314
                            )?;
315
                            handle_claim(
316
                                &config,
317
                                &self_id,
318
                                &inflight_proofs,
319
                                input_resolver.clone(),
320
                                &txn_sender,
321
                                &loaded_images,
322
                                &input_staging_area,
323
                                payload,
324
                                &bix.accounts,
325
                            )
326
                            .await
327
                        }
328
                        ChannelInstructionIxType::StatusV1 => Ok(()),
329
                        _ => {
330
                            info!("Unknown instruction type");
331
                            Ok(())
332
                        }
333
                    };
334
                    if result.is_err() {
335
                        info!("Error: {:?}", result);
336
                    }
337
                    result
338
                });
339
            }
340
            Ok(())
341
        }));
342
        Ok(tx)
343
    }
344

            
345
    pub fn stop(&mut self) -> Result<()> {
346
        self.worker_handle.take().unwrap().abort();
347
        Ok(())
348
    }
349
}
350

            
351
pub async fn handle_claim<'a>(
352
    config: &ProverNodeConfig,
353
    self_identity: &Pubkey,
354
    in_flight_proofs: InflightProofRef<'a>,
355
    input_resolver: Arc<dyn InputResolver + 'static>,
356
    transaction_sender: &RpcTransactionSender,
357
    loaded_images: LoadedImageMapRef<'a>,
358
    input_staging_area: InputStagingAreaRef<'a>,
359
    claim: ClaimV1<'a>,
360
    accounts: &[Pubkey], // need to create canonical parsing of accounts per instruction type for my flatbuffer model or use shank
361
) -> Result<()> {
362
    info!("Received claim event");
363
    let claimer = accounts[3];
364
    let execution_id = claim.execution_id().ok_or(Risc0RunnerError::InvalidData)?;
365
    if &claimer != self_identity {
366
        let attempt = in_flight_proofs.remove(execution_id);
367
        if let Some((ifp, claim)) = attempt {
368
            if let ClaimStatus::Claiming = claim.status {
369
                transaction_sender.clear_signature_status(&claim.claim_signature);
370
                emit_event!(MetricEvents::ClaimMissed, execution_id => ifp, signature => &claim.claim_signature.to_string());
371
            }
372
        }
373
        return Ok(());
374
    }
375

            
376
    let claim_status = in_flight_proofs
377
        .get(execution_id)
378
        .map(|v| v.value().to_owned());
379
    if let Some(mut claim) = claim_status {
380
        emit_event!(MetricEvents::ClaimReceived, execution_id => execution_id);
381
        if let ClaimStatus::Claiming = claim.status {
382
            if let Some(image) = loaded_images.get(&claim.image_id) {
383
                if image.data.is_none() {
384
                    return Err(Risc0RunnerError::ImageDataUnavailable.into());
385
                }
386
                //if image is not loaded at claim, fail
387
                let mut inputs = input_staging_area
388
                    .get(execution_id)
389
                    .ok_or(Risc0RunnerError::InvalidData)?
390
                    .value()
391
                    .clone(); //clone soe we dont hold a reference over http requests
392
                let unresolved_count = inputs
393
                    .iter()
394
                    .filter(|i| match i {
395
                        ProgramInput::Unresolved(_) => true,
396
                        _ => false,
397
                    })
398
                    .count();
399

            
400
                if unresolved_count > 0 {
401
                    info!("{} outstanding inputs", unresolved_count);
402

            
403
                    emit_event_with_duration!(MetricEvents::InputDownload, {
404
                        input_resolver.resolve_private_inputs(execution_id, &mut inputs, Arc::new(transaction_sender)).await?;
405
                    }, execution_id => execution_id, stage => "private");
406
                    input_staging_area.insert(execution_id.to_string(), inputs);
407
                    // one of the huge problems with the claim system is that we are not guaranteed to have
408
                    // the inputs we need at the time we claim and no way to
409
                }
410
                info!("{} inputs resolved", unresolved_count);
411

            
412
                // drain the inputs and own them here, this is a bit of a hack but it works
413
                let (eid, inputs) = input_staging_area
414
                    .remove(execution_id)
415
                    .ok_or(Risc0RunnerError::InvalidData)?;
416
                let mem_image = image.get_memory_image()?;
417
                let result: Result<
418
                    (Journal, Digest, SuccinctReceipt<ReceiptClaim>),
419
                    Risc0RunnerError,
420
                > = tokio::task::spawn_blocking(move || {
421
                    risc0_prove(mem_image, inputs).map_err(|e| {
422
                        info!("Error generating proof: {:?}", e);
423
                        Risc0RunnerError::ProofGenerationError
424
                    })
425
                })
426
                .await?;
427
                match result {
428
                    Ok((journal, assumptions_digest, receipt)) => {
429
                        let compressed_receipt = risc0_compress_proof(
430
                            config.stark_compression_tools_path.as_str(),
431
                            receipt,
432
                        )
433
                        .await
434
                        .map_err(|e| {
435
                            info!("Error compressing proof: {:?}", e);
436
                            Risc0RunnerError::ProofCompressionError
437
                        })?;
438

            
439
                        let (input_digest, committed_outputs) = journal.bytes.split_at(32);
440
                        let sig = transaction_sender
441
                            .submit_proof(
442
                                &eid,
443
                                claim.requester,
444
                                claim.program_callback.clone(),
445
                                &compressed_receipt.proof,
446
                                &compressed_receipt.execution_digest,
447
                                input_digest,
448
                                assumptions_digest.as_bytes(),
449
                                committed_outputs,
450
                                claim.additional_accounts.clone(),
451
                                compressed_receipt.exit_code_system,
452
                                compressed_receipt.exit_code_user,
453
                            )
454
                            .await
455
                            .map_err(|e| {
456
                                error!("Error submitting proof: {:?}", e);
457
                                Risc0RunnerError::TransactionError(e.to_string())
458
                            })?;
459

            
460
                        claim.status = ClaimStatus::Submitted;
461
                        claim.submission_signature = Some(sig);
462
                        in_flight_proofs.insert(eid.clone(), claim);
463
                        info!("Proof submitted: {:?}", sig);
464
                    }
465
                    Err(e) => {
466
                        info!("Error generating proof: {:?}", e);
467
                    }
468
                };
469
                in_flight_proofs.remove(&eid);
470
            } else {
471
                info!("Image not loaded, fatal error aborting execution");
472
            }
473
        }
474
    }
475
    Ok(())
476
}
477

            
478
async fn handle_execution_request<'a>(
479
    config: &ProverNodeConfig,
480
    in_flight_proofs: InflightProofRef<'a>,
481
    input_resolver: Arc<dyn InputResolver + 'static>,
482
    img_client: Arc<reqwest::Client>,
483
    transaction_sender: &RpcTransactionSender,
484
    loaded_images: LoadedImageMapRef<'a>,
485
    input_staging_area: InputStagingAreaRef<'a>,
486
    _execution_block: u64,
487
    exec: ExecutionRequestV1<'a>,
488
    accounts: &[Pubkey],
489
) -> Result<()> {
490
    if !can_execute(exec) {
491
        warn!(
492
            "Execution request for incompatible prover version: {:?}",
493
            exec.prover_version()
494
        );
495
        emit_event!(MetricEvents::IncompatibleProverVersion, execution_id => exec.execution_id().unwrap_or_default());
496
        return Ok(());
497
    }
498

            
499
    // current naive implementation is to accept everything we have pending capacity for on this node, but this needs work
500
    let inflight = in_flight_proofs.len();
501
    emit_event!(MetricEvents::ExecutionRequest, execution_id => exec.execution_id().unwrap_or_default());
502
    if inflight < config.maximum_concurrent_proofs as usize {
503
        let eid = exec
504
            .execution_id()
505
            .map(|d| d.to_string())
506
            .ok_or(Risc0RunnerError::InvalidData)?;
507
        let image_id = exec
508
            .image_id()
509
            .map(|d| d.to_string())
510
            .ok_or(Risc0RunnerError::InvalidData)?;
511
        let expiry = exec.max_block_height();
512
        let img = loaded_images.get(&image_id);
513
        let img = if img.is_none() {
514
            match config.missing_image_strategy {
515
                MissingImageStrategy::DownloadAndClaim => {
516
                    info!("Image not loaded, attempting to load and running claim");
517
                    load_image(
518
                        config,
519
                        transaction_sender,
520
                        &img_client,
521
                        &image_id,
522
                        loaded_images,
523
                    )
524
                    .await?;
525
                    loaded_images.get(&image_id)
526
                }
527
                MissingImageStrategy::DownloadAndMiss => {
528
                    info!("Image not loaded, loading and rejecting claim");
529
                    load_image(
530
                        config,
531
                        transaction_sender,
532
                        &img_client,
533
                        &image_id,
534
                        loaded_images,
535
                    )
536
                    .await?;
537
                    None
538
                }
539
                MissingImageStrategy::Fail => {
540
                    info!("Image not loaded, rejecting claim");
541
                    None
542
                }
543
            }
544
        } else {
545
            img
546
        }
547
        .ok_or(Risc0RunnerError::ImgLoadError)?;
548

            
549
        // naive compute cost estimate which is YES WE CAN DO THIS in the default amount of time
550
        emit_histogram!(MetricEvents::ImageComputeEstimate, img.size  as f64, image_id => image_id.clone());
551
        //ensure compute can happen before expiry
552
        //execution_block + (image_compute_estimate % config.max_compute_per_block) + 1 some bogus calc
553
        let computable_by = expiry / 2;
554

            
555
        if computable_by < expiry {
556
            //the way this is done can cause race conditions where so many request come in a short time that we accept
557
            // them before we change the value of g so we optimistically change to inflight and we will decrement if we dont win the claim
558
            let inputs = exec.input().ok_or(Risc0RunnerError::InvalidData)?;
559
            let program_inputs = emit_event_with_duration!(MetricEvents::InputDownload, {
560
                input_resolver.resolve_public_inputs(
561
                    inputs.iter().map(|i| i.unpack()).collect()
562
                ).await?
563
            }, execution_id => eid, stage => "public");
564
            input_staging_area.insert(eid.clone(), program_inputs);
565
            let sig = transaction_sender
566
                .claim(&eid, accounts[0], accounts[2], computable_by)
567
                .await
568
                .map_err(|e| Risc0RunnerError::TransactionError(e.to_string()));
569
            match sig {
570
                Ok(sig) => {
571
                    let callback_program = exec
572
                        .callback_program_id()
573
                        .and_then::<[u8; 32], _>(|v| v.bytes().try_into().ok())
574
                        .map(Pubkey::from);
575
                    let callback = if callback_program.is_some() {
576
                        Some(ProgramExec {
577
                            program_id: callback_program.unwrap(),
578
                            instruction_prefix: exec
579
                                .callback_instruction_prefix()
580
                                .map(|v| v.bytes().to_vec())
581
                                .unwrap_or(vec![0x1]),
582
                        })
583
                    } else {
584
                        None
585
                    };
586

            
587
                    in_flight_proofs.insert(
588
                        eid.clone(),
589
                        InflightProof {
590
                            execution_id: eid.clone(),
591
                            image_id: image_id.clone(),
592
                            status: ClaimStatus::Claiming,
593
                            expiry,
594
                            claim_signature: sig,
595
                            submission_signature: None,
596
                            requester: accounts[0],
597
                            program_callback: callback,
598
                            additional_accounts: exec
599
                                .callback_extra_accounts()
600
                                .unwrap_or_default()
601
                                .into_iter()
602
                                .map(|a| {
603
                                    let pkbytes: [u8; 32] = a.pubkey().into();
604
                                    let pubkey = Pubkey::try_from(pkbytes).unwrap_or_default();
605
                                    let writable = a.writable();
606
                                    AccountMeta {
607
                                        pubkey,
608
                                        is_writable: writable == 1,
609
                                        is_signer: false,
610
                                    }
611
                                })
612
                                .collect(),
613
                        },
614
                    );
615
                    emit_event!(MetricEvents::ClaimAttempt, execution_id => eid);
616
                }
617
                Err(e) => {
618
                    info!("Error claiming: {:?}", e);
619
                    in_flight_proofs.remove(&eid);
620
                }
621
            }
622
        }
623
    }
624
    Ok(())
625
}
626

            
627
async fn load_image<'a>(
628
    config: &ProverNodeConfig,
629
    transaction_sender: &RpcTransactionSender,
630
    http_client: &reqwest::Client,
631
    image_id: &str,
632
    loaded_images: LoadedImageMapRef<'a>,
633
) -> Result<()> {
634
    let account = transaction_sender
635
        .get_deployment_account(image_id)
636
        .await
637
        .map_err(Risc0RunnerError::ImageDownloadError)?;
638
    let deploy_data = root_as_deploy_v1(&account.data)
639
        .map_err(|_| anyhow::anyhow!("Failed to parse account data"))?;
640
    handle_image_deployment(config, http_client, deploy_data, loaded_images).await?;
641
    Ok(())
642
}
643

            
644
async fn handle_image_deployment<'a>(
645
    config: &ProverNodeConfig,
646
    http_client: &reqwest::Client,
647
    deploy: DeployV1<'a>,
648
    loaded_images: LoadedImageMapRef<'a>,
649
) -> Result<()> {
650
    let url = deploy.url().ok_or(Risc0RunnerError::InvalidData)?;
651
    let size = deploy.size_();
652
    emit_histogram!(MetricEvents::ImageDownload, size as f64, url => url.to_string());
653
    emit_event_with_duration!(MetricEvents::ImageDownload, {
654
        let resp = http_client.get(url).send().await?.error_for_status()?;
655
        let min = std::cmp::min(size, (config.max_image_size_mb * 1024 * 1024) as u64) as usize;
656
        info!("Downloading image, size {} min {}", size, min);
657
        if resp.status().is_success() {
658
            let stream = resp.bytes_stream();
659
            let resp_data = get_body_max_size(stream, min)
660
                .await
661
                .map_err(|_|Risc0RunnerError::ImgTooLarge)?;
662

            
663
            let img = Image::from_bytes(resp_data)?;
664
            if let Some(bytes) = img.bytes() {
665
                tokio::fs::write(Path::new(&config.risc0_image_folder).join(img.id.clone()), bytes).await?;
666
            }
667
            if img.id != deploy.image_id().unwrap_or_default() {
668
                return Err(Risc0RunnerError::InvalidData.into());
669
            }
670
            loaded_images.insert(img.id.clone(), img);
671
        }
672
        Ok(())
673
    }, url => url.to_string())
674
}
675

            
676
// proving function, no async this is cpu/gpu intesive
677
fn risc0_prove(
678
    memory_image: MemoryImage,
679
    sorted_inputs: Vec<ProgramInput>,
680
) -> Result<(Journal, Digest, SuccinctReceipt<ReceiptClaim>)> {
681
    let image_id = memory_image.compute_id().to_string();
682
    let mut exec = new_risc0_exec_env(memory_image, sorted_inputs)?;
683
    let session = exec.run()?;
684
    // Obtain the default prover.
685
    let prover = get_risc0_prover()?;
686
    let ctx = VerifierContext::default();
687
    let info = emit_event_with_duration!(MetricEvents::ProofGeneration,{
688
        prover.prove_session(&ctx, &session)
689
    }, system => "risc0")?;
690
    emit_histogram!(MetricEvents::ProofSegments, info.stats.segments as f64, system => "risc0", image_id => &image_id);
691
    emit_histogram!(MetricEvents::ProofCycles, info.stats.total_cycles as f64, system => "risc0", cycle_type => "total", image_id => &image_id);
692
    emit_histogram!(MetricEvents::ProofCycles, info.stats.user_cycles as f64, system => "risc0", cycle_type => "user", image_id => &image_id);
693
    if let InnerReceipt::Composite(cr) = &info.receipt.inner {
694
        let sr = emit_event_with_duration!(MetricEvents::ProofConversion,{ prover.composite_to_succinct(cr) }, system => "risc0")?;
695
        let ident_receipt = identity_p254(&sr)?;
696
        if let MaybePruned::Value(rc) = sr.claim {
697
            if let MaybePruned::Value(Some(op)) = rc.output {
698
                if let MaybePruned::Value(ass) = op.assumptions {
699
                    return Ok((info.receipt.journal, ass.digest(), ident_receipt));
700
                }
701
            }
702
        }
703
    }
704
    Err(Risc0RunnerError::ProofGenerationError.into())
705
}
706

            
707
pub struct CompressedReceipt {
708
    pub execution_digest: Vec<u8>,
709
    pub exit_code_system: u32,
710
    pub exit_code_user: u32,
711
    pub proof: Vec<u8>,
712
}
713
/// Compresses the proof to be sent to the blockchain
714
/// This is a temporary solution until the wasm groth16 prover or a rust impl is working
715
async fn risc0_compress_proof(
716
    tools_path: &str,
717
    succinct_receipt: SuccinctReceipt<ReceiptClaim>,
718
) -> Result<CompressedReceipt> {
719
    let sealbytes = succinct_receipt.get_seal_bytes();
720
    if !(ARCH == "x86_64" || ARCH == "x86") {
721
        panic!("X86 only");
722
    }
723
    let tmp = tempdir()?;
724
    let prove_dir = tmp.path();
725
    let root_path = Path::new(tools_path);
726
    let mut cursor = Cursor::new(&sealbytes);
727
    let inputs = prove_dir.join("input.json");
728
    let witness = prove_dir.join("out.wtns");
729
    let input_file = File::create(&inputs).await?;
730
    emit_event_with_duration!(MetricEvents::ProofConversion,{
731
        async_to_json(&mut cursor, input_file).await
732
    }, system => "groth16json")?;
733
    let zkey = root_path.join("stark_verify_final.zkey");
734
    let proof_out = prove_dir.join("proof.json");
735
    let public = prove_dir.join("public.json");
736
    emit_event_with_duration!(MetricEvents::ProofCompression,{
737
    let status = Command::new(root_path.join("stark_verify"))
738
        .arg(inputs.clone())
739
        .arg(witness.clone())
740
        .output()
741
        .await?;
742
    if !status.status.success() {
743
        info!("witness {:?}", status);
744
        return Err(Risc0RunnerError::ProofCompressionError.into());
745
    }
746
    let snark_status = Command::new(root_path.join("rapidsnark"))
747
        .arg(zkey)
748
        .arg(witness)
749
        .arg(proof_out.clone())
750
        .arg(public)
751
        .output()
752
        .await?;
753
    if !snark_status.status.success() {
754
        info!("snark {:?}", snark_status);
755
        return Err(Risc0RunnerError::ProofCompressionError.into());
756
    }
757
    }, system => "risc0");
758

            
759
    let mut proof_fd = File::open(proof_out).await?;
760
    let mt = proof_fd.metadata().await?;
761
    let mut bytes = Vec::with_capacity(mt.len() as usize);
762
    proof_fd.read_to_end(&mut bytes).await?;
763
    let proof: ProofJson = serde_json::from_slice(&bytes)?;
764
    let seal: Seal = proof.try_into()?;
765
    let claim = succinct_receipt.claim;
766
    if let MaybePruned::Value(rc) = claim {
767
        let (system, user) = match rc.exit_code {
768
            ExitCode::Halted(user_exit) => (0, user_exit),
769
            ExitCode::Paused(user_exit) => (1, user_exit),
770
            ExitCode::SystemSplit => (2, 0),
771
            ExitCode::SessionLimit => (2, 2),
772
        };
773
        Ok(CompressedReceipt {
774
            execution_digest: rc.post.digest().as_bytes().to_vec(),
775
            exit_code_system: system,
776
            exit_code_user: user,
777
            proof: seal.to_vec(),
778
        })
779
    } else {
780
        Err(Risc0RunnerError::ProofCompressionError.into())
781
    }
782
}
783

            
784
fn can_execute(exec: ExecutionRequestV1) -> bool {
785
    let version = exec.prover_version().try_into();
786
    if version.is_ok() {
787
        match version.unwrap() {
788
            REQUIRED_PROVER => true,
789
            _ => false,
790
        }
791
    } else {
792
        false
793
    }
794
}