diff options
Diffstat (limited to 'examples')
-rw-r--r-- | examples/sg_tst_async.cpp | 382 |
1 files changed, 260 insertions, 122 deletions
diff --git a/examples/sg_tst_async.cpp b/examples/sg_tst_async.cpp index b7ad38b6..4ff60ab1 100644 --- a/examples/sg_tst_async.cpp +++ b/examples/sg_tst_async.cpp @@ -41,6 +41,7 @@ #include <fcntl.h> #include <stdio.h> #include <stdlib.h> +#include <stdarg.h> #include <string.h> #include <poll.h> #include <errno.h> @@ -55,15 +56,15 @@ #include "sg_lib.h" #include "sg_io_linux.h" -static const char * version_str = "1.01 20140710"; +static const char * version_str = "1.04 20140712"; static const char * util_name = "sg_tst_async"; /* This is a test program for checking the async usage of the Linux sg - * driver. Each thread opens 1 file descriptor to the sg device and then - * starts up to 16 commands while checking with the poll command for - * the completion of those commands. Each command has a unique "pack_id" - * which is a sequence starting at 1. Either TEST UNIT UNIT, READ(16) - * or WRITE(16) commands are issued. + * driver. Each thread opens 1 file descriptor to the given sg device and + * then starts up to 16 commands while checking with the poll command (or + * ioctl(SG_GET_NUM_WAITING) ) for the completion of those commands. Each + * command has a unique "pack_id" which is a sequence starting at 1. + * Either TEST UNIT UNIT, READ(16) or WRITE(16) commands are issued. * * This is C++ code with some things from C++11 (e.g. threads) and was * only just able to compile (when some things were reverted) with gcc/g++ @@ -74,7 +75,8 @@ static const char * util_name = "sg_tst_async"; * The build uses various object files from the <sg3_utils>/lib directory * which is assumed to be a sibling of this examples directory. Those * object files in the lib directory can be built with: - * cd <sg3_utils> ; ./configure ; cd lib; make + * cd <sg3_utils_package_root> ; ./configure ; cd lib; make + * cd ../examples * Then to build sg_tst_async concatenate the next 3 lines: * g++ -Wall -std=c++11 -pthread -I ../include ../lib/sg_lib.o * ../lib/sg_lib_data.o ../lib/sg_io_linux.o -o sg_tst_async @@ -85,10 +87,10 @@ static const char * util_name = "sg_tst_async"; * Currently this utility is Linux only and uses the sg driver. The bsg * driver is known to be broken (it doesn't match responses to the * correct file descriptor that requested them) so this utility won't - * be extended to bsg until that if fixed. + * be extended to bsg until that is fixed. * - * BEWARE: this utility will modify a logical block (default LBA 1000) on the - * given device when the '-W' option is given. + * BEWARE: >>> This utility will modify a logical block (default LBA 1000) + * on the given device when the '-W' option is given. * */ @@ -104,8 +106,9 @@ using namespace std::chrono; #define DEF_DIRECT 0 #define DEF_NO_XFER 0 -#define Q_PER_FD 16 +#define Q_PER_FD 16 /* sg driver per file descriptor limit */ #define MAX_CONSEC_NOMEMS 16 +#define URANDOM_DEV "/dev/urandom" #ifndef SG_FLAG_Q_AT_TAIL #define SG_FLAG_Q_AT_TAIL 0x10 @@ -120,6 +123,7 @@ using namespace std::chrono; #define EBUFF_SZ 256 static mutex console_mutex; +static mutex rand_lba_mutex; static atomic<int> async_starts(0); static atomic<int> async_finishes(0); static atomic<int> ebusy_count(0); @@ -130,6 +134,7 @@ static int page_size = 4096; /* rough guess, will ask sysconf() */ enum command2execute {SCSI_TUR, SCSI_READ16, SCSI_WRITE16}; enum blkQDiscipline {BQ_DEFAULT, BQ_AT_HEAD, BQ_AT_TAIL}; +enum myQDiscipline {MQD_LOW, MQD_MEDIUM, MQD_HIGH}; struct opts_t { const char * dev_name; @@ -137,47 +142,60 @@ struct opts_t { int num_per_thread; bool block; uint64_t lba; - unsigned int hi_lba; /* one after last */ + unsigned int hi_lba; /* last one, inclusive range */ int lb_sz; bool no_xfer; int verbose; int wait_ms; command2execute c2e; blkQDiscipline bqd; + myQDiscipline mqd; }; #if 0 class Rand_uint { public: - Rand_uint(unsigned int lo, unsigned int hi_p1) : p{lo, hi_p1} {} + Rand_uint(unsigned int lo, unsigned int hi) : p{lo, hi} {} unsigned int operator()() const { return r(); } private: uniform_int_distribution<unsigned int>::param_type p; auto r = bind(uniform_int_distribution<unsigned int>{p}, default_random_engine()); + /* compiler thinks auto should be a static, bs again? */ }; #endif + +#if 0 class Rand_uint { public: - Rand_uint(unsigned int lo, unsigned int hi_p1) - : r(bind(uniform_int_distribution<unsigned int>{lo, hi_p1}, - default_random_engine())) {} + Rand_uint(unsigned int lo, unsigned int hi, unsigned int my_seed) + : r(bind(uniform_int_distribution<unsigned int>{lo, hi}, + default_random_engine())) { r.seed(myseed); } unsigned int operator()() const { return r(); } private: function<unsigned int()> r; }; +#endif +class Rand_uint { +public: + Rand_uint(unsigned int lo, unsigned int hi, unsigned int my_seed) + : uid(lo, hi), dre(my_seed) { } + unsigned int get() { return uid(dre); } +private: + uniform_int_distribution<unsigned int> uid; + default_random_engine dre; +}; static void usage(void) { printf("Usage: %s [-d] [-f] [-h] [-l <lba+>] [-n <n_per_thr>] [-N]\n" - " [-q 0|1] [-R] [-s <lb_sz>] [-t <num_thrs>] " - "[-T]\n" - " [-v] [-V] [-w <wait_ms>] [-W] " - "<sg_disk_device>\n", - util_name); + " [-q 0|1] [-Q 0|1|2] [-R] [-s <lb_sz>]\n" + " [-t <num_thrs>] [-T] [-v] [-V] " + "[-w <wait_ms>]\n" + " [-W] <sg_disk_device>\n", util_name); printf(" where\n"); printf(" -d do direct_io (def: indirect)\n"); printf(" -f force: any sg device (def: only scsi_debug " @@ -192,6 +210,10 @@ usage(void) printf(" -N no data xfer (def: xfer on READ and " "WRITE)\n"); printf(" -q 0|1 0: blk q_at_head; 1: q_at_tail\n"); + printf(" -Q 0|1|2 0: favour completions (smaller q), 1: " + "medium,\n" + " 2: favour submissions (larger q, " + "default)\n"); printf(" -s <lb_sz> logical block size (def: 512)\n"); printf(" -R do READs (def: TUR)\n"); printf(" -t <num_thrs> number of threads (def: %d)\n", @@ -210,6 +232,68 @@ usage(void) "random sequence of LBAs.\n"); } +#ifdef __GNUC__ +static int pr2serr(const char * fmt, ...) + __attribute__ ((format (printf, 1, 2))); +static void pr_errno(int e_no, const char * fmt, ...) + __attribute__ ((format (printf, 2, 3))); +#else +static int pr2serr(const char * fmt, ...); +static void pr_errno(int e_no, const char * fmt, ...); +#endif + + +static int +pr2serr(const char * fmt, ...) +{ + int n; + + console_mutex.lock(); + { + va_list args; + + va_start(args, fmt); + n = vfprintf(stderr, fmt, args); + va_end(args); + } + console_mutex.unlock(); + return n; +} + +static void +pr_errno(int e_no, const char * fmt, ...) +{ + char b[128]; + + console_mutex.lock(); + { + va_list args; + + va_start(args, fmt); + vsnprintf(b, sizeof(b), fmt, args); + fprintf(stderr, "%s: %s\n", b, strerror(e_no)); + va_end(args); + } + console_mutex.unlock(); +} + +static unsigned int +get_urandom_uint(void) +{ + unsigned int res = 0; + int n; + unsigned char b[sizeof(unsigned int)]; + int fd = open(URANDOM_DEV, O_RDONLY); + + if (fd >= 0) { + /* assume this read is atomic */ + n = read(fd, b, sizeof(unsigned int)); + if (sizeof(unsigned int) == n) + memcpy(&res, b, sizeof(unsigned int)); + close(fd); + } + return res; +} #define TUR_CMD_LEN 6 #define READ16_REPLY_LEN 512 @@ -229,7 +313,7 @@ start_sg3_cmd(int sg_fd, command2execute cmd2exe, int pack_id, uint64_t lba, unsigned char w16CmdBlk[WRITE16_CMD_LEN] = {0x8a, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}; unsigned char sense_buffer[64]; - const char * np; + const char * np = NULL; memset(&pt, 0, sizeof(pt)); switch (cmd2exe) { @@ -288,11 +372,7 @@ start_sg3_cmd(int sg_fd, command2execute cmd2exe, int pack_id, uint64_t lba, this_thread::yield(); continue; } - console_mutex.lock(); - cerr << __func__ << ": " << np << " pack_id=" << pack_id << " [k=" << - k << "]" << endl; - perror(" write(sg)"); - console_mutex.unlock(); + pr_errno(errno, "%s: %s, pack_id=%d", __func__, np, pack_id); return -1; } return 0; @@ -336,9 +416,7 @@ finish_sg3_cmd(int sg_fd, command2execute cmd2exe, int & pack_id, int wait_ms, sleep(0); // process yield ?? } if (res < 0) { - console_mutex.lock(); - perror("do_rd_inc_wr_twice: read(sg, READ_16)"); - console_mutex.unlock(); + pr_errno(errno, "%s: %s", __func__, np); return -1; } /* now for the error processing */ @@ -349,10 +427,7 @@ finish_sg3_cmd(int sg_fd, command2execute cmd2exe, int & pack_id, int wait_ms, ok = 1; break; case SG_LIB_CAT_RECOVERED: - console_mutex.lock(); - fprintf(stderr, "%s: Recovered error on %s, continuing\n", - __func__, np); - console_mutex.unlock(); + pr2serr("%s: Recovered error on %s, continuing\n", __func__, np); ok = 1; break; default: /* won't bother decoding other categories */ @@ -378,9 +453,7 @@ get_aligned_heap(int bytes_at_least) #if 1 int err = posix_memalign(&wp, page_size, n); if (err) { - console_mutex.lock(); - fprintf(stderr, "posix_memalign: error [%d] out of memory?\n", err); - console_mutex.unlock(); + pr2serr("posix_memalign: error [%d] out of memory?\n", err); return NULL; } memset(wp, 0, n); @@ -391,10 +464,8 @@ get_aligned_heap(int bytes_at_least) memset(wp, 0, n); return (unsigned char *)wp; } else { - console_mutex.lock(); - fprintf(stderr, "get_aligned_heap: too fiddly to align, choose " - "smaller lb_sz\n"); - console_mutex.unlock(); + pr2serr("get_aligned_heap: too fiddly to align, choose smaller " + "lb_sz\n"); return NULL; } #endif @@ -406,38 +477,40 @@ work_thread(int id, struct opts_t * op) int thr_async_starts = 0; int thr_async_finishes = 0; unsigned int thr_eagain_count = 0; + unsigned int seed = 0; int k, n, res, sg_fd, num_outstanding, do_inc, num, pack_id, sg_flags; + int num_waiting_read, num_to_read; int open_flags = O_RDWR; + bool is_rw = (SCSI_TUR != op->c2e); char ebuff[EBUFF_SZ]; + uint64_t lba; unsigned char * lbp; const char * err = NULL; Rand_uint * ruip = NULL; - struct pollfd pfd; + struct pollfd pfd[1]; list<unsigned char *> free_lst; - map<int, unsigned char *> pi_map; + map<int, unsigned char *> pi_2_buff; + map<int, uint64_t> pi_2_lba; - if (op->verbose) { - console_mutex.lock(); - cerr << "Enter work_thread id=" << id << endl; - console_mutex.unlock(); - } + if (op->verbose) + pr2serr("Enter work_thread id=%d\n", id); if (! op->block) open_flags |= O_NONBLOCK; sg_fd = open(op->dev_name, open_flags); if (sg_fd < 0) { - snprintf(ebuff, EBUFF_SZ, "%s: id=%d, error opening file: %s", - __func__, id, op->dev_name); - console_mutex.lock(); - perror(ebuff); - console_mutex.unlock(); + pr_errno(errno, "%s: id=%d, error opening file: %s", __func__, id, + op->dev_name); return; } - pfd.fd = sg_fd; - pfd.events = POLLIN; - - if (op->hi_lba) - ruip = new Rand_uint((unsigned int)op->lba, op->hi_lba); + pfd[0].fd = sg_fd; + pfd[0].events = POLLIN; + if (is_rw && op->hi_lba) { + seed = get_urandom_uint(); + if (op->verbose > 1) + pr2serr(" id=%d, /dev/urandom seed=0x%x\n", id, seed); + ruip = new Rand_uint((unsigned int)op->lba, op->hi_lba, seed); + } sg_flags = 0; if (BQ_AT_TAIL == op->bqd) @@ -448,12 +521,9 @@ work_thread(int id, struct opts_t * op) sg_flags |= SG_FLAG_DIRECT_IO; if (op->no_xfer) sg_flags |= SG_FLAG_NO_DXFER; - if (op->verbose > 1) { - console_mutex.lock(); - fprintf(stderr, "sg_flags=0x%x, %s cmd\n", sg_flags, - ((SCSI_TUR != op->c2e) ? "TUR": "IO")); - console_mutex.unlock(); - } + if (op->verbose > 1) + pr2serr("sg_flags=0x%x, %s cmds\n", sg_flags, ((SCSI_TUR == op->c2e) ? + "TUR": ((SCSI_READ16 == op->c2e) ? "READ" : "WRITE"))); num = op->num_per_thread; for (k = 0, num_outstanding = 0; (k < num) || num_outstanding; @@ -462,7 +532,7 @@ work_thread(int id, struct opts_t * op) if ((num_outstanding < Q_PER_FD) && (k < num)) { do_inc = 1; pack_id = uniq_pack_id.fetch_add(1); - if (SCSI_TUR != op->c2e) { + if (is_rw) { if (free_lst.empty()) { lbp = get_aligned_heap(op->lb_sz); if (NULL == lbp) { @@ -475,94 +545,148 @@ work_thread(int id, struct opts_t * op) } } else lbp = NULL; - if (start_sg3_cmd(sg_fd, op->c2e, pack_id, - (ruip ? (uint64_t)(*ruip)() : op->lba), lbp, - op->lb_sz, sg_flags)) { + if (is_rw) { + if (ruip) { + lba = ruip->get(); + if (op->verbose > 3) + pr2serr(" start IO at lba=0x%" PRIx64 "\n", lba); + } else + lba = op->lba; + } else + lba = 0; + if (start_sg3_cmd(sg_fd, op->c2e, pack_id, lba, lbp, op->lb_sz, + sg_flags)) { err = "start_sg3_cmd()"; break; } ++thr_async_starts; ++num_outstanding; - pi_map[pack_id] = lbp; - /* check if any responses, don't wait */ - res = poll(&pfd, 1, 0); - if (res < 0) { - err = "poll(0) failed"; + pi_2_buff[pack_id] = lbp; + if (ruip) + pi_2_lba[pack_id] = lba; + } + num_to_read = 0; + if ((num_outstanding >= Q_PER_FD) || (k >= num)) { + /* full queue or finished injecting */ + num_waiting_read = 0; + if (ioctl(sg_fd, SG_GET_NUM_WAITING, &num_waiting_read) < 0) { + err = "ioctl(SG_GET_NUM_WAITING) failed"; break; } - } else { - /* check if any responses, wait as requested */ - res = poll(&pfd, 1, ((op->wait_ms > 0) ? op->wait_ms : 0)); - if (res < 0) { - err = "poll(wait_ms) failed"; - break; + if (1 == num_waiting_read) + num_to_read = num_waiting_read; + else if (num_waiting_read > 0) { + if (k >= num) + num_to_read = num_waiting_read; + else { + switch (op->mqd) { + case MQD_LOW: + num_to_read = num_waiting_read; + break; + case MQD_MEDIUM: + num_to_read = num_waiting_read / 2; + break; + case MQD_HIGH: + default: + num_to_read = 1; + break; + } + } + } else { + n = (op->wait_ms > 0) ? op->wait_ms : 0; + while (0 == (res = poll(pfd, 1, n))) { + if (res < 0) { + err = "poll(wait_ms) failed"; + break; + } + } + if (err) + break; + } + } else { /* not full, not finished injecting */ + if (MQD_HIGH == op->mqd) + num_to_read = 0; + else { + num_waiting_read = 0; + if (ioctl(sg_fd, SG_GET_NUM_WAITING, &num_waiting_read) < 0) { + err = "ioctl(SG_GET_NUM_WAITING) failed"; + break; + } + if (num_waiting_read > 0) + num_to_read = num_waiting_read / + ((MQD_LOW == op->mqd) ? 1 : 2); + else + num_to_read = 0; } } - if (0 == res) - continue; - while (res-- > 0) { + + while (num_to_read-- > 0) { if (finish_sg3_cmd(sg_fd, op->c2e, pack_id, op->wait_ms, thr_eagain_count)) { err = "finish_sg3_cmd()"; + if (ruip && (pack_id > 0)) { + auto q = pi_2_lba.find(pack_id); + + if (q != pi_2_lba.end()) { + snprintf(ebuff, sizeof(ebuff), "%s: lba=0x%" PRIx64 , + err, q->second); + err = ebuff; + } + } break; } ++thr_async_finishes; --num_outstanding; - auto p = pi_map.find(pack_id); + auto p = pi_2_buff.find(pack_id); - if (p == pi_map.end()) { + if (p == pi_2_buff.end()) { snprintf(ebuff, sizeof(ebuff), "pack_id=%d from " "finish_sg3_cmd() not found\n", pack_id); - err = ebuff; - break; + if (! err) + err = ebuff; } else { lbp = p->second; - pi_map.erase(p); + pi_2_buff.erase(p); if (lbp) free_lst.push_front(lbp); } + if (ruip && (pack_id > 0)) { + auto q = pi_2_lba.find(pack_id); + + if (q != pi_2_lba.end()) + pi_2_lba.erase(q); + } + if (err) + break; } + if (err) + break; } + close(sg_fd); // sg driver will handle any commands "in flight" if (ruip) delete ruip; - close(sg_fd); // sg driver will handle any commands "in flight" - if (err || (k < num) || (op->verbose > 0)) { - console_mutex.lock(); - if (k < num) { - cerr << "thread id=" << id << " FAILed at iteration: " << k; - if (err) - cerr << ", Reason: " << err << endl; - else - cerr << endl; - } else { - if (err) - cerr << "thread id=" << id << " FAILed on last, " << - "Reason: " << err << endl; - else - cerr << "thread id=" << id << " normal exit" << '\n'; - } - console_mutex.unlock(); - } - n = pi_map.size(); - if (n > 0) { - console_mutex.lock(); - cerr << "thread id=" << id << " Still " << n << " elements " << - "in pack_id map on exit" << endl; - console_mutex.unlock(); + if (err || (k < num)) { + if (k < num) + pr2serr("thread id=%d FAILed at iteration %d%s%s\n", id, k, + (err ? ", Reason: " : ""), (err ? err : "")); + else + pr2serr("thread id=%d FAILed on last%s%s\n", id, + (err ? ", Reason: " : ""), (err ? err : "")); } + n = pi_2_buff.size(); + if (n > 0) + pr2serr("thread id=%d Still %d elements in pi_2_buff map on exit\n", + id, n); for (k = 0; ! free_lst.empty(); ++k) { lbp = free_lst.back(); free_lst.pop_back(); if (lbp) free(lbp); } - if ((op->verbose > 2) && (k > 0)) { - console_mutex.lock(); - cerr << "thread id=" << id << - " Maximum number of READ/WRITEs queued: " << k << endl; - console_mutex.unlock(); - } + if ((op->verbose > 2) && (k > 0)) + pr2serr("thread id=%d Maximum number of READ/WRITEs queued: %d\n", + id, k); async_starts += thr_async_starts; async_finishes += thr_async_finishes; eagain_count += thr_eagain_count; @@ -677,6 +801,7 @@ main(int argc, char * argv[]) op->c2e = SCSI_TUR; op->bqd = BQ_DEFAULT; op->block = !! DEF_BLOCKING; + op->mqd = MQD_HIGH; page_size = sysconf(_SC_PAGESIZE); for (k = 1; k < argc; ++k) { @@ -699,12 +824,12 @@ main(int argc, char * argv[]) cp = strchr(argv[k], ','); if (cp) { ll = sg_get_llnum(cp + 1); - if ((-1 == ll) || (ll >= (UINT_MAX - 1))) { - fprintf(stderr, "could not decode hi_lba, or too " - "large\n"); + if ((-1 == ll) || (ll > UINT_MAX)) { + fprintf(stderr, "could not decode hi_lba, or > " + "UINT_MAX\n"); return 1; } else - op->hi_lba = (unsigned int)ll + 1; + op->hi_lba = (unsigned int)ll; } } else break; @@ -725,6 +850,17 @@ main(int argc, char * argv[]) else if (1 == n) op->bqd = BQ_AT_TAIL; } + } else if (0 == memcmp("-Q", argv[k], 2)) { + ++k; + if ((k < argc) && isdigit(*argv[k])) { + n = atoi(argv[k]); + if (0 == n) + op->mqd = MQD_LOW; + else if (1 == n) + op->mqd = MQD_MEDIUM; + else if (2 == n) + op->mqd = MQD_HIGH; + } } else if (0 == memcmp("-R", argv[k], 2)) op->c2e = SCSI_READ16; else if (0 == memcmp("-s", argv[k], 2)) { @@ -784,7 +920,7 @@ main(int argc, char * argv[]) usage(); return 1; } - if (op->hi_lba && (op->lba >= op->hi_lba)) { + if (op->hi_lba && (op->lba > op->hi_lba)) { cerr << "lba,hi_lba range is illegal" << endl; return 1; } @@ -827,6 +963,7 @@ main(int argc, char * argv[]) vector<thread *> vt; + /* start multi-threaded section */ for (k = 0; k < num_threads; ++k) { thread * tp = new thread {work_thread, k, op}; vt.push_back(tp); @@ -835,6 +972,7 @@ main(int argc, char * argv[]) // g++ 4.7.3 didn't like range-for loop here for (k = 0; k < (int)vt.size(); ++k) vt[k]->join(); + /* end multi-threaded section, just this main thread left */ for (k = 0; k < (int)vt.size(); ++k) delete vt[k]; |