1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
extern crate foundationdb as fdb;
extern crate futures;
extern crate rand;
extern crate stopwatch;
#[macro_use]
extern crate log;
extern crate env_logger;
extern crate structopt;

use std::sync::atomic::*;
use std::sync::Arc;

use futures::future::*;
use rand::prelude::*;
use rand::rngs::mock::StepRng;
use stopwatch::Stopwatch;
use structopt::StructOpt;

use crate::fdb::*;

#[derive(Clone)]
struct Counter {
    size: usize,
    inner: Arc<AtomicUsize>,
}
impl Counter {
    fn new(size: usize) -> Self {
        Self {
            size,
            inner: Default::default(),
        }
    }

    fn decr(&self, n: usize) -> bool {
        let val = self.inner.fetch_add(n, Ordering::SeqCst);
        val < self.size
    }
}

#[derive(Clone)]
struct Bench {
    db: Arc<Database>,
    opt: Opt,
}

impl Bench {
    fn run(self) {
        let opt = &self.opt;
        let counter = Counter::new(opt.count);

        let mut handles = Vec::new();

        let sw = Stopwatch::start_new();

        let step = (opt.queue_depth + opt.threads - 1) / opt.threads;
        let mut start = 0;
        for _ in 0..opt.threads {
            let end = std::cmp::min(start + step, opt.queue_depth);

            let range = start..end;
            let counter = counter.clone();
            let b = self.clone();
            let handle = std::thread::spawn(move || {
                futures::executor::block_on(b.run_range(range, counter))
            });
            handles.push(handle);

            start = end;
        }

        for handle in handles {
            handle
                .join()
                .expect("failed to join")
                .expect("failed to run bench");
        }

        let elapsed = sw.elapsed_ms() as usize;

        info!(
            "bench took: {:?} ms, {:?} tps",
            elapsed,
            1000 * opt.count / elapsed
        );
    }

    async fn run_range(&self, r: std::ops::Range<usize>, counter: Counter) -> FdbResult<()> {
        try_join_all(r.map(|n| {
            // With deterministic Rng, benchmark with same parameters will overwrite same set
            // of keys again, which makes benchmark result stable.
            let rng = StepRng::new(n as u64, 1);
            self.run_bench(rng, counter.clone())
        }))
        .await?;
        Ok(())
    }

    async fn run_bench(&self, mut rng: StepRng, counter: Counter) -> FdbResult<()> {
        let mut key_buf = vec![0; self.opt.key_len];

        let mut val_buf = vec![0; self.opt.val_len];

        let trx_batch_size = self.opt.trx_batch_size;
        let mut trx = self.db.create_trx()?;

        loop {
            for _ in 0..trx_batch_size {
                rng.fill_bytes(&mut key_buf);
                rng.fill_bytes(&mut val_buf);
                key_buf[0] = 0x01;
                trx.set(&key_buf, &val_buf);
            }

            trx = trx.commit().await?.reset();

            if !counter.decr(trx_batch_size) {
                break Ok(());
            }
        }
    }
}

#[derive(StructOpt, Debug, Clone)]
#[structopt(name = "fdb-bench")]
struct Opt {
    #[structopt(short = "t", long = "threads", default_value = "1")]
    threads: usize,

    #[structopt(short = "q", long = "queue-depth", default_value = "1000")]
    queue_depth: usize,

    #[structopt(short = "c", long = "count", default_value = "300000")]
    count: usize,

    #[structopt(long = "trx-batch-size", default_value = "10")]
    trx_batch_size: usize,

    #[structopt(long = "key-len", default_value = "10")]
    key_len: usize,
    #[structopt(long = "val-len", default_value = "100")]
    val_len: usize,
}

fn main() {
    env_logger::init();
    let opt = Opt::from_args();
    info!("opt: {:?}", opt);

    let _guard = unsafe { foundationdb::boot() };
    let db = Arc::new(
        futures::executor::block_on(fdb::Database::new_compat(None))
            .expect("failed to get database"),
    );

    let bench = Bench { db, opt };
    bench.run();
}