aboutsummaryrefslogtreecommitdiff
path: root/src/sgp_dd.c
diff options
context:
space:
mode:
authorDouglas Gilbert <dgilbert@interlog.com>2021-10-28 23:35:11 +0000
committerDouglas Gilbert <dgilbert@interlog.com>2021-10-28 23:35:11 +0000
commitf0195003bb0c66ba55084b2f7e0fe982f08c5675 (patch)
treeac27a2f679ce91e865fa0a578c9cfa2d97110b07 /src/sgp_dd.c
parentc1ce6d6c5f3f64038e17dfd65380aa728c5d994d (diff)
downloadsg3_utils-f0195003bb0c66ba55084b2f7e0fe982f08c5675.tar.gz
sg_dd: 'iflag=00,ff' places the 32 bit block address (big endian) into each block; sgp_dd: major rework, new: --chkaddr which checks for block address in each block
git-svn-id: https://svn.bingwo.ca/repos/sg3_utils/trunk@916 6180dd3e-e324-4e3e-922d-17de1ae2f315
Diffstat (limited to 'src/sgp_dd.c')
-rw-r--r--src/sgp_dd.c490
1 files changed, 271 insertions, 219 deletions
diff --git a/src/sgp_dd.c b/src/sgp_dd.c
index 503df157..adf4c2ec 100644
--- a/src/sgp_dd.c
+++ b/src/sgp_dd.c
@@ -84,7 +84,7 @@
#include "sg_pr2serr.h"
-static const char * version_str = "5.78 20210601";
+static const char * version_str = "5.82 20211027";
#define DEF_BLOCK_SIZE 512
#define DEF_BLOCKS_PER_TRANSFER 128
@@ -140,51 +140,54 @@ struct flags_t {
bool mmap;
};
-typedef struct request_collection
+struct opts_t
{ /* one instance visible to all threads */
int infd;
int64_t skip;
int in_type;
int cdbsz_in;
struct flags_t in_flags;
- int64_t in_blk; /* -\ next block address to read */
- int64_t in_count; /* | blocks remaining for next read */
- int64_t in_rem_count; /* | count of remaining in blocks */
- int in_partial; /* | */
- bool in_stop; /* | */
- pthread_mutex_t in_mutex; /* -/ */
+ int64_t in_blk; /* next block address to read */
+ int64_t in_count; /* blocks remaining for next read */
+ int64_t in_rem_count; /* count of remaining in blocks */
+ int in_partial;
+ pthread_mutex_t inout_mutex;
int outfd;
int64_t seek;
int out_type;
int cdbsz_out;
struct flags_t out_flags;
- int64_t out_blk; /* -\ next block address to write */
- int64_t out_count; /* | blocks remaining for next write */
- int64_t out_rem_count; /* | count of remaining out blocks */
- int out_partial; /* | */
- bool out_stop; /* | */
- pthread_mutex_t out_mutex; /* | */
- pthread_cond_t out_sync_cv; /* -/ hold writes until "in order" */
+ int64_t out_blk; /* next block address to write */
+ int64_t out_count; /* blocks remaining for next write */
+ int64_t out_rem_count; /* count of remaining out blocks */
+ int out_partial;
+ pthread_cond_t out_sync_cv;
int bs;
int bpt;
int num_threads;
- int dio_incomplete_count; /* -\ */
- int sum_of_resids; /* | */
- pthread_mutex_t aux_mutex; /* -/ (also serializes some printf()s */
+ int dio_incomplete_count;
+ int sum_of_resids;
bool mmap_active;
+ int chkaddr; /* check read data contains 4 byte, big endian block
+ * addresses, once: check only 4 bytes per block */
+ int progress; /* accept --progress or -p, does nothing */
int debug;
int dry_run;
-} Rq_coll;
+};
typedef struct thread_arg
{ /* pointer to this argument passed to thread */
int id;
- Rq_coll * clp;
+ int64_t seek_skip;
+ struct opts_t * clp;
} Thread_arg;
typedef struct request_element
{ /* one instance per worker thread */
bool wr;
+ bool in_stop;
+ bool in_err;
+ bool out_err;
int infd;
int outfd;
int64_t blk;
@@ -210,10 +213,12 @@ static pthread_t sig_listen_thread_id;
static const char * proc_allow_dio = "/proc/scsi/sg/allow_dio";
-static void sg_in_operation(Rq_coll * clp, Rq_elem * rep);
-static void sg_out_operation(Rq_coll * clp, Rq_elem * rep);
-static bool normal_in_operation(Rq_coll * clp, Rq_elem * rep, int blocks);
-static void normal_out_operation(Rq_coll * clp, Rq_elem * rep, int blocks);
+static void sg_in_operation(struct opts_t * clp, Rq_elem * rep);
+static void sg_out_operation(struct opts_t * clp, Rq_elem * rep,
+ bool bump_out_blk);
+static void normal_in_operation(struct opts_t * clp, Rq_elem * rep, int blocks);
+static void normal_out_operation(struct opts_t * clp, Rq_elem * rep, int blocks,
+ bool bump_out_blk);
static int sg_start_io(Rq_elem * rep);
static int sg_finish_io(bool wr, Rq_elem * rep, pthread_mutex_t * a_mutp);
@@ -222,12 +227,18 @@ static int sg_finish_io(bool wr, Rq_elem * rep, pthread_mutex_t * a_mutp);
/* Assume initialized to 0, but want to start at 1, hence adding 1 in macro */
static atomic_uint ascending_val;
+static atomic_uint num_eintr;
+static atomic_uint num_eagain;
+static atomic_uint num_ebusy;
+static atomic_bool exit_threads;
+
#define GET_NEXT_PACK_ID(_v) (atomic_fetch_add(&ascending_val, _v) + (_v))
#else
static pthread_mutex_t av_mut = PTHREAD_MUTEX_INITIALIZER;
static int ascending_val = 1;
+static volatile bool exit_threads;
static unsigned int
GET_NEXT_PACK_ID(unsigned int val)
@@ -253,7 +264,7 @@ static Thread_arg thr_arg_a[MAX_NUM_THREADS];
static bool shutting_down = false;
static bool do_sync = false;
static bool do_time = false;
-static Rq_coll rcoll;
+static struct opts_t my_opts;
static struct timeval start_tm;
static int64_t dd_count = -1;
static int exit_status = 0;
@@ -278,7 +289,7 @@ calc_duration_throughput(int contin)
}
a = res_tm.tv_sec;
a += (0.000001 * res_tm.tv_usec);
- b = (double)rcoll.bs * (dd_count - rcoll.out_rem_count);
+ b = (double)my_opts.bs * (dd_count - my_opts.out_rem_count);
pr2serr("time to transfer data %s %d.%06d secs",
(contin ? "so far" : "was"), (int)res_tm.tv_sec,
(int)res_tm.tv_usec);
@@ -293,16 +304,16 @@ print_stats(const char * str)
{
int64_t infull, outfull;
- if (0 != rcoll.out_rem_count)
+ if (0 != my_opts.out_rem_count)
pr2serr(" remaining block count=%" PRId64 "\n",
- rcoll.out_rem_count);
- infull = dd_count - rcoll.in_rem_count;
+ my_opts.out_rem_count);
+ infull = dd_count - my_opts.in_rem_count;
pr2serr("%s%" PRId64 "+%d records in\n", str,
- infull - rcoll.in_partial, rcoll.in_partial);
+ infull - my_opts.in_partial, my_opts.in_partial);
- outfull = dd_count - rcoll.out_rem_count;
+ outfull = dd_count - my_opts.out_rem_count;
pr2serr("%s%" PRId64 "+%d records out\n", str,
- outfull - rcoll.out_partial, rcoll.out_partial);
+ outfull - my_opts.out_partial, my_opts.out_partial);
}
static void
@@ -451,6 +462,7 @@ usage()
" time 0->no timing(def), 1->time plus calculate "
"throughput\n"
" verbose same as 'deb=VERB': increase verbosity\n"
+ " --chkaddr|-c check read data contains blk address\n"
" --dry-run|-d prepare but bypass copy/read\n"
" --help|-h output this usage message then exit\n"
" --verbose|-v increase verbosity of utility\n"
@@ -459,29 +471,6 @@ usage()
"specialized for SCSI devices, uses multiple POSIX threads\n");
}
-static void
-guarded_stop_in(Rq_coll * clp)
-{
- pthread_mutex_lock(&clp->in_mutex);
- clp->in_stop = true;
- pthread_mutex_unlock(&clp->in_mutex);
-}
-
-static void
-guarded_stop_out(Rq_coll * clp)
-{
- pthread_mutex_lock(&clp->out_mutex);
- clp->out_stop = true;
- pthread_mutex_unlock(&clp->out_mutex);
-}
-
-static void
-guarded_stop_both(Rq_coll * clp)
-{
- guarded_stop_in(clp);
- guarded_stop_out(clp);
-}
-
static int
sgp_mem_mmap(int fd, int res_sz, uint8_t ** mmpp)
{
@@ -576,7 +565,7 @@ read_blkdev_capacity(int sg_fd, int64_t * num_sect, int * sect_sz)
static void *
sig_listen_thread(void * v_clp)
{
- Rq_coll * clp = (Rq_coll *)v_clp;
+ struct opts_t * clp = (struct opts_t *)v_clp;
int sig_number;
while (1) {
@@ -585,7 +574,11 @@ sig_listen_thread(void * v_clp)
break;
if (SIGINT == sig_number) {
pr2serr("%sinterrupted by SIGINT\n", my_name);
- guarded_stop_both(clp);
+#ifdef HAVE_C11_ATOMICS
+ atomic_store(&exit_threads, true);
+#else
+ exit_threads = true;
+#endif
pthread_cond_broadcast(&clp->out_sync_cv);
}
}
@@ -595,24 +588,20 @@ sig_listen_thread(void * v_clp)
static void
cleanup_in(void * v_clp)
{
- Rq_coll * clp = (Rq_coll *)v_clp;
+ struct opts_t * clp = (struct opts_t *)v_clp;
pr2serr("thread cancelled while in mutex held\n");
- clp->in_stop = true;
- pthread_mutex_unlock(&clp->in_mutex);
- guarded_stop_out(clp);
+ pthread_mutex_unlock(&clp->inout_mutex);
pthread_cond_broadcast(&clp->out_sync_cv);
}
static void
cleanup_out(void * v_clp)
{
- Rq_coll * clp = (Rq_coll *)v_clp;
+ struct opts_t * clp = (struct opts_t *)v_clp;
pr2serr("thread cancelled while out mutex held\n");
- clp->out_stop = true;
- pthread_mutex_unlock(&clp->out_mutex);
- guarded_stop_in(clp);
+ pthread_mutex_unlock(&clp->inout_mutex);
pthread_cond_broadcast(&clp->out_sync_cv);
}
@@ -693,28 +682,33 @@ static void *
read_write_thread(void * v_tap)
{
Thread_arg * tap = (Thread_arg *)v_tap;
- Rq_coll * clp = tap->clp;
+ struct opts_t * clp;
Rq_elem rel;
Rq_elem * rep = &rel;
- int sz;
- volatile bool stop_after_write = false;
- int64_t seek_skip;
+ volatile bool stop_after_write, b;
+ bool enforce_write_ordering;
+ int sz, c_addr;
+ int64_t out_blk, out_count;
+ int64_t seek_skip = tap->seek_skip;
int blocks, status;
- sz = clp->bpt * clp->bs;
- seek_skip = clp->seek - clp->skip;
- memset(rep, 0, sizeof(Rq_elem));
+ stop_after_write = false;
+ clp = tap->clp;
+ enforce_write_ordering = (FT_DEV_NULL != clp->out_type) &&
+ (FT_SG != clp->out_type);
+ c_addr = clp->chkaddr;
+ memset(rep, 0, sizeof(*rep));
/* Following clp members are constant during lifetime of thread */
rep->bs = clp->bs;
if ((clp->num_threads > 1) && clp->mmap_active) {
/* sg devices need separate file descriptor */
if (clp->in_flags.mmap && (FT_SG == clp->in_type)) {
- rep->infd = sg_in_open(infn, &clp->in_flags, clp->bs, clp->bpt);
+ rep->infd = sg_in_open(infn, &clp->in_flags, rep->bs, clp->bpt);
if (rep->infd < 0) err_exit(-rep->infd, "error opening infn");
} else
rep->infd = clp->infd;
if (clp->out_flags.mmap && (FT_SG == clp->out_type)) {
- rep->outfd = sg_out_open(outfn, &clp->out_flags, clp->bs,
+ rep->outfd = sg_out_open(outfn, &clp->out_flags, rep->bs,
clp->bpt);
if (rep->outfd < 0) err_exit(-rep->outfd, "error opening outfn");
@@ -724,6 +718,7 @@ read_write_thread(void * v_tap)
rep->infd = clp->infd;
rep->outfd = clp->outfd;
}
+ sz = clp->bpt * rep->bs;
rep->debug = clp->debug;
rep->cdbsz_in = clp->cdbsz_in;
rep->cdbsz_out = clp->cdbsz_out;
@@ -742,12 +737,19 @@ read_write_thread(void * v_tap)
}
while(1) {
- status = pthread_mutex_lock(&clp->in_mutex);
- if (0 != status) err_exit(status, "lock in_mutex");
- if (clp->in_stop || (clp->in_count <= 0)) {
+ if ((rep->in_stop) || (rep->in_err) || (rep->out_err))
+ break;
+ status = pthread_mutex_lock(&clp->inout_mutex);
+ if (0 != status) err_exit(status, "lock inout_mutex");
+#ifdef HAVE_C11_ATOMICS
+ b = atomic_load(&exit_threads);
+#else
+ b = exit_threads;
+#endif
+ if (b || (clp->in_count <= 0)) {
/* no more to do, exit loop then thread */
- status = pthread_mutex_unlock(&clp->in_mutex);
- if (0 != status) err_exit(status, "unlock in_mutex");
+ status = pthread_mutex_unlock(&clp->inout_mutex);
+ if (0 != status) err_exit(status, "unlock inout_mutex");
break;
}
blocks = (clp->in_count > clp->bpt) ? clp->bpt : clp->in_count;
@@ -756,117 +758,151 @@ read_write_thread(void * v_tap)
rep->num_blks = blocks;
clp->in_blk += blocks;
clp->in_count -= blocks;
+ /* while we have this lock, find corresponding out_blk */
+ out_blk = rep->blk + seek_skip;
+ out_count = clp->out_count;
+ if (! enforce_write_ordering)
+ clp->out_blk += blocks;
+ clp->out_count -= blocks;
+ status = pthread_mutex_unlock(&clp->inout_mutex);
+ if (0 != status) err_exit(status, "unlock inout_mutex");
pthread_cleanup_push(cleanup_in, (void *)clp);
if (FT_SG == clp->in_type)
- sg_in_operation(clp, rep); /* lets go of in_mutex mid operation */
- else {
- stop_after_write = normal_in_operation(clp, rep, blocks);
- status = pthread_mutex_unlock(&clp->in_mutex);
- if (0 != status) err_exit(status, "unlock in_mutex");
+ sg_in_operation(clp, rep);
+ else
+ normal_in_operation(clp, rep, blocks);
+ if (c_addr && (rep->bs > 3)) {
+ int k, j, off, num;
+ uint32_t addr = (uint32_t)rep->blk;
+
+ num = (1 == c_addr) ? 4 : (rep->bs - 3);
+ for (k = 0, off = 0; k < blocks; ++k, ++addr, off += rep->bs) {
+ for (j = 0; j < num; j += 4) {
+ if (addr != sg_get_unaligned_be32(rep->buffp + off + j))
+ break;
+ }
+ if (j < num)
+ break;
+ }
+ if (k < blocks) {
+ pr2serr("%s: chkaddr failure at addr=0x%x\n", __func__, addr);
+ rep->in_err = true;
+ }
}
pthread_cleanup_pop(0);
+ if (rep->in_err) {
+ status = pthread_mutex_lock(&clp->inout_mutex);
+ if (0 != status) err_exit(status, "lock inout_mutex");
+ /* write-side not done, so undo changes to out_blk + out_count */
+ if (! enforce_write_ordering)
+ clp->out_blk -= blocks;
+ clp->out_count += blocks;
+ status = pthread_mutex_unlock(&clp->inout_mutex);
+ if (0 != status) err_exit(status, "unlock inout_mutex");
+ break;
+ }
- status = pthread_mutex_lock(&clp->out_mutex);
- if (0 != status) err_exit(status, "lock out_mutex");
- if (FT_DEV_NULL != clp->out_type) {
- while ((! clp->out_stop) &&
- ((rep->blk + seek_skip) != clp->out_blk)) {
+ if (enforce_write_ordering) {
+ status = pthread_mutex_lock(&clp->inout_mutex);
+ if (0 != status) err_exit(status, "lock inout_mutex");
+#ifdef HAVE_C11_ATOMICS
+ b = atomic_load(&exit_threads);
+#else
+ b = exit_threads;
+#endif
+ while ((! b) && (out_blk != clp->out_blk)) {
/* if write would be out of sequence then wait */
pthread_cleanup_push(cleanup_out, (void *)clp);
- status = pthread_cond_wait(&clp->out_sync_cv, &clp->out_mutex);
+ status = pthread_cond_wait(&clp->out_sync_cv,
+ &clp->inout_mutex);
if (0 != status) err_exit(status, "cond out_sync_cv");
pthread_cleanup_pop(0);
}
+ status = pthread_mutex_unlock(&clp->inout_mutex);
+ if (0 != status) err_exit(status, "unlock inout_mutex");
}
- if (clp->out_stop || (clp->out_count <= 0)) {
- if (! clp->out_stop)
- clp->out_stop = true;
- status = pthread_mutex_unlock(&clp->out_mutex);
- if (0 != status) err_exit(status, "unlock out_mutex");
+#ifdef HAVE_C11_ATOMICS
+ b = atomic_load(&exit_threads);
+#else
+ b = exit_threads;
+#endif
+ if (b || (out_count <= 0))
break;
- }
- if (stop_after_write)
- clp->out_stop = true;
+
rep->wr = true;
- rep->blk = clp->out_blk;
- clp->out_blk += blocks;
- clp->out_count -= blocks;
+ rep->blk = out_blk;
if (0 == rep->num_blks) {
- clp->out_stop = true;
- stop_after_write = true;
- status = pthread_mutex_unlock(&clp->out_mutex);
- if (0 != status) err_exit(status, "unlock out_mutex");
break; /* read nothing so leave loop */
}
pthread_cleanup_push(cleanup_out, (void *)clp);
if (FT_SG == clp->out_type)
- sg_out_operation(clp, rep); /* releases out_mutex mid operation */
+ sg_out_operation(clp, rep, enforce_write_ordering);
else if (FT_DEV_NULL == clp->out_type) {
/* skip actual write operation */
clp->out_rem_count -= blocks;
- status = pthread_mutex_unlock(&clp->out_mutex);
- if (0 != status) err_exit(status, "unlock out_mutex");
- }
- else {
- normal_out_operation(clp, rep, blocks);
- status = pthread_mutex_unlock(&clp->out_mutex);
- if (0 != status) err_exit(status, "unlock out_mutex");
}
+ else
+ normal_out_operation(clp, rep, blocks, enforce_write_ordering);
pthread_cleanup_pop(0);
- if (stop_after_write)
- break;
- pthread_cond_broadcast(&clp->out_sync_cv);
+ if (enforce_write_ordering)
+ pthread_cond_broadcast(&clp->out_sync_cv);
} /* end of while loop */
+
if (rep->alloc_bp)
free(rep->alloc_bp);
- status = pthread_mutex_lock(&clp->in_mutex);
- if (0 != status) err_exit(status, "lock in_mutex");
- if (! clp->in_stop)
- clp->in_stop = true; /* flag other workers to stop */
- status = pthread_mutex_unlock(&clp->in_mutex);
- if (0 != status) err_exit(status, "unlock in_mutex");
+ if (rep->in_err || rep->out_err) {
+ stop_after_write = true;
+#ifdef HAVE_C11_ATOMICS
+ if (! atomic_load(&exit_threads))
+ atomic_store(&exit_threads, true);
+#else
+ if (! exit_threads)
+ exit_threads = true;
+#endif
+ }
pthread_cond_broadcast(&clp->out_sync_cv);
- return stop_after_write ? NULL : clp;
+ return (stop_after_write || rep->in_stop) ? NULL : clp;
}
-static bool
-normal_in_operation(Rq_coll * clp, Rq_elem * rep, int blocks)
+static void
+normal_in_operation(struct opts_t * clp, Rq_elem * rep, int blocks)
{
- bool stop_after_write = false;
- int res;
+ int res, status;
char strerr_buff[STRERR_BUFF_LEN + 1];
- /* enters holding in_mutex */
- while (((res = read(rep->infd, rep->buffp, blocks * clp->bs)) < 0) &&
+ while (((res = read(rep->infd, rep->buffp, blocks * rep->bs)) < 0) &&
((EINTR == errno) || (EAGAIN == errno)))
;
if (res < 0) {
- if (clp->in_flags.coe) {
+ if (rep->in_flags.coe) {
memset(rep->buffp, 0, rep->num_blks * rep->bs);
pr2serr(">> substituted zeros for in blk=%" PRId64 " for %d "
"bytes, %s\n", rep->blk,
rep->num_blks * rep->bs,
tsafe_strerror(errno, strerr_buff));
- res = rep->num_blks * clp->bs;
+ res = rep->num_blks * rep->bs;
}
else {
pr2serr("error in normal read, %s\n",
tsafe_strerror(errno, strerr_buff));
- clp->in_stop = true;
- guarded_stop_out(clp);
- return 1;
+ rep->in_stop = true;
+ rep->in_err = true;
+ return;
}
}
- if (res < blocks * clp->bs) {
+ status = pthread_mutex_lock(&clp->inout_mutex);
+ if (0 != status) err_exit(status, "lock inout_mutex");
+ if (res < blocks * rep->bs) {
int o_blocks = blocks;
- stop_after_write = true;
- blocks = res / clp->bs;
- if ((res % clp->bs) > 0) {
+
+ rep->in_stop = true;
+ blocks = res / rep->bs;
+ if ((res % rep->bs) > 0) {
blocks++;
clp->in_partial++;
}
@@ -878,43 +914,49 @@ normal_in_operation(Rq_coll * clp, Rq_elem * rep, int blocks)
clp->in_count -= blocks;
}
clp->in_rem_count -= blocks;
- return stop_after_write;
+ status = pthread_mutex_unlock(&clp->inout_mutex);
+ if (0 != status) err_exit(status, "lock inout_mutex");
}
static void
-normal_out_operation(Rq_coll * clp, Rq_elem * rep, int blocks)
+normal_out_operation(struct opts_t * clp, Rq_elem * rep, int blocks,
+ bool bump_out_blk)
{
- int res;
+ int res, status;
char strerr_buff[STRERR_BUFF_LEN + 1];
- /* enters holding out_mutex */
- while (((res = write(rep->outfd, rep->buffp, rep->num_blks * clp->bs))
+ while (((res = write(rep->outfd, rep->buffp, rep->num_blks * rep->bs))
< 0) && ((EINTR == errno) || (EAGAIN == errno)))
;
if (res < 0) {
- if (clp->out_flags.coe) {
+ if (rep->out_flags.coe) {
pr2serr(">> ignored error for out blk=%" PRId64 " for %d bytes, "
"%s\n", rep->blk, rep->num_blks * rep->bs,
tsafe_strerror(errno, strerr_buff));
- res = rep->num_blks * clp->bs;
+ res = rep->num_blks * rep->bs;
}
else {
pr2serr("error normal write, %s\n",
tsafe_strerror(errno, strerr_buff));
- guarded_stop_in(clp);
- clp->out_stop = true;
+ rep->out_err = true;
return;
}
}
- if (res < blocks * clp->bs) {
- blocks = res / clp->bs;
- if ((res % clp->bs) > 0) {
+ status = pthread_mutex_lock(&clp->inout_mutex);
+ if (0 != status) err_exit(status, "lock inout_mutex");
+ if (res < blocks * rep->bs) {
+ blocks = res / rep->bs;
+ if ((res % rep->bs) > 0) {
blocks++;
clp->out_partial++;
}
rep->num_blks = blocks;
}
clp->out_rem_count -= blocks;
+ if (bump_out_blk)
+ clp->out_blk += blocks;
+ status = pthread_mutex_unlock(&clp->inout_mutex);
+ if (0 != status) err_exit(status, "lock inout_mutex");
}
static int
@@ -988,12 +1030,11 @@ sg_build_scsi_cdb(uint8_t * cdbp, int cdb_sz, unsigned int blocks,
}
static void
-sg_in_operation(Rq_coll * clp, Rq_elem * rep)
+sg_in_operation(struct opts_t * clp, Rq_elem * rep)
{
int res;
int status;
- /* enters holding in_mutex */
while (1) {
res = sg_start_io(rep);
if (1 == res)
@@ -1001,31 +1042,25 @@ sg_in_operation(Rq_coll * clp, Rq_elem * rep)
else if (res < 0) {
pr2serr("%sinputting to sg failed, blk=%" PRId64 "\n", my_name,
rep->blk);
- status = pthread_mutex_unlock(&clp->in_mutex);
- if (0 != status) err_exit(status, "unlock in_mutex");
- guarded_stop_both(clp);
+ rep->in_stop = true;
+ rep->in_err = true;
return;
}
- /* Now release in mutex to let other reads run in parallel */
- status = pthread_mutex_unlock(&clp->in_mutex);
- if (0 != status) err_exit(status, "unlock in_mutex");
-
- res = sg_finish_io(rep->wr, rep, &clp->aux_mutex);
+ res = sg_finish_io(rep->wr, rep, &clp->inout_mutex);
switch (res) {
case SG_LIB_CAT_ABORTED_COMMAND:
case SG_LIB_CAT_UNIT_ATTENTION:
/* try again with same addr, count info */
/* now re-acquire in mutex for balance */
/* N.B. This re-read could now be out of read sequence */
- status = pthread_mutex_lock(&clp->in_mutex);
- if (0 != status) err_exit(status, "lock in_mutex");
break;
case SG_LIB_CAT_MEDIUM_HARD:
- if (0 == clp->in_flags.coe) {
+ if (0 == rep->in_flags.coe) {
pr2serr("error finishing sg in command (medium)\n");
if (exit_status <= 0)
exit_status = res;
- guarded_stop_both(clp);
+ rep->in_stop = true;
+ rep->in_err = true;
return;
} else {
memset(rep->buffp, 0, rep->num_blks * rep->bs);
@@ -1039,19 +1074,15 @@ sg_in_operation(Rq_coll * clp, Rq_elem * rep)
#endif
#endif
case 0:
+ status = pthread_mutex_lock(&clp->inout_mutex);
+ if (0 != status) err_exit(status, "lock inout_mutex");
if (rep->dio_incomplete_count || rep->resid) {
- status = pthread_mutex_lock(&clp->aux_mutex);
- if (0 != status) err_exit(status, "lock aux_mutex");
clp->dio_incomplete_count += rep->dio_incomplete_count;
clp->sum_of_resids += rep->resid;
- status = pthread_mutex_unlock(&clp->aux_mutex);
- if (0 != status) err_exit(status, "unlock aux_mutex");
}
- status = pthread_mutex_lock(&clp->in_mutex);
- if (0 != status) err_exit(status, "lock in_mutex");
clp->in_rem_count -= rep->num_blks;
- status = pthread_mutex_unlock(&clp->in_mutex);
- if (0 != status) err_exit(status, "unlock in_mutex");
+ status = pthread_mutex_unlock(&clp->inout_mutex);
+ if (0 != status) err_exit(status, "unlock inout_mutex");
return;
case SG_LIB_CAT_ILLEGAL_REQ:
if (clp->debug)
@@ -1061,19 +1092,19 @@ sg_in_operation(Rq_coll * clp, Rq_elem * rep)
pr2serr("error finishing sg in command (%d)\n", res);
if (exit_status <= 0)
exit_status = res;
- guarded_stop_both(clp);
+ rep->in_stop = true;
+ rep->in_err = true;
return;
}
- }
+ } /* end of while loop */
}
static void
-sg_out_operation(Rq_coll * clp, Rq_elem * rep)
+sg_out_operation(struct opts_t * clp, Rq_elem * rep, bool bump_out_blk)
{
int res;
int status;
- /* enters holding out_mutex */
while (1) {
res = sg_start_io(rep);
if (1 == res)
@@ -1081,31 +1112,23 @@ sg_out_operation(Rq_coll * clp, Rq_elem * rep)
else if (res < 0) {
pr2serr("%soutputting from sg failed, blk=%" PRId64 "\n",
my_name, rep->blk);
- status = pthread_mutex_unlock(&clp->out_mutex);
- if (0 != status) err_exit(status, "unlock out_mutex");
- guarded_stop_both(clp);
+ rep->out_err = true;
return;
}
- /* Now release in mutex to let other reads run in parallel */
- status = pthread_mutex_unlock(&clp->out_mutex);
- if (0 != status) err_exit(status, "unlock out_mutex");
-
- res = sg_finish_io(rep->wr, rep, &clp->aux_mutex);
+ res = sg_finish_io(rep->wr, rep, &clp->inout_mutex);
switch (res) {
case SG_LIB_CAT_ABORTED_COMMAND:
case SG_LIB_CAT_UNIT_ATTENTION:
/* try again with same addr, count info */
/* now re-acquire out mutex for balance */
/* N.B. This re-write could now be out of write sequence */
- status = pthread_mutex_lock(&clp->out_mutex);
- if (0 != status) err_exit(status, "lock out_mutex");
break;
case SG_LIB_CAT_MEDIUM_HARD:
- if (0 == clp->out_flags.coe) {
+ if (0 == rep->out_flags.coe) {
pr2serr("error finishing sg out command (medium)\n");
if (exit_status <= 0)
exit_status = res;
- guarded_stop_both(clp);
+ rep->out_err = true;
return;
} else
pr2serr(">> ignored error for out blk=%" PRId64 " for %d "
@@ -1117,29 +1140,27 @@ sg_out_operation(Rq_coll * clp, Rq_elem * rep)
#endif
#endif
case 0:
+ status = pthread_mutex_lock(&clp->inout_mutex);
+ if (0 != status) err_exit(status, "lock inout_mutex");
if (rep->dio_incomplete_count || rep->resid) {
- status = pthread_mutex_lock(&clp->aux_mutex);
- if (0 != status) err_exit(status, "lock aux_mutex");
clp->dio_incomplete_count += rep->dio_incomplete_count;
clp->sum_of_resids += rep->resid;
- status = pthread_mutex_unlock(&clp->aux_mutex);
- if (0 != status) err_exit(status, "unlock aux_mutex");
}
- status = pthread_mutex_lock(&clp->out_mutex);
- if (0 != status) err_exit(status, "lock out_mutex");
clp->out_rem_count -= rep->num_blks;
- status = pthread_mutex_unlock(&clp->out_mutex);
- if (0 != status) err_exit(status, "unlock out_mutex");
+ if (bump_out_blk)
+ clp->out_blk += rep->num_blks;
+ status = pthread_mutex_unlock(&clp->inout_mutex);
+ if (0 != status) err_exit(status, "unlock inout_mutex");
return;
case SG_LIB_CAT_ILLEGAL_REQ:
if (clp->debug)
sg_print_command_len(rep->cdb, rep->cdbsz_out);
/* FALL THROUGH */
default:
+ rep->out_err = true;
pr2serr("error finishing sg out command (%d)\n", res);
if (exit_status <= 0)
exit_status = res;
- guarded_stop_both(clp);
return;
}
}
@@ -1180,15 +1201,23 @@ sg_start_io(Rq_elem * rep)
if (mmap)
hp->flags |= SG_FLAG_MMAP_IO;
if (rep->debug > 8) {
- pr2serr("sg_start_io: SCSI %s, blk=%" PRId64 " num_blks=%d\n",
- rep->wr ? "WRITE" : "READ", rep->blk, rep->num_blks);
+ pr2serr("%s: SCSI %s, blk=%" PRId64 " num_blks=%d\n", __func__,
+ rep->wr ? "WRITE" : "READ", rep->blk, rep->num_blks);
sg_print_command(hp->cmdp);
}
while (((res = write(rep->wr ? rep->outfd : rep->infd, hp,
sizeof(struct sg_io_hdr))) < 0) &&
- ((EINTR == errno) || (EAGAIN == errno) || (EBUSY == errno)))
- ;
+ ((EINTR == errno) || (EAGAIN == errno) || (EBUSY == errno))) {
+#ifdef HAVE_C11_ATOMICS
+ if (EINTR == errno)
+ atomic_fetch_add(&num_eintr, 1);
+ else if (EAGAIN == errno)
+ atomic_fetch_add(&num_eagain, 1);
+ else
+ atomic_fetch_add(&num_ebusy, 1);
+#endif
+ }
if (res < 0) {
if (ENOMEM == errno)
return 1;
@@ -1240,21 +1269,22 @@ sg_finish_io(bool wr, Rq_elem * rep, pthread_mutex_t * a_mutp)
break;
case SG_LIB_CAT_ABORTED_COMMAND:
case SG_LIB_CAT_UNIT_ATTENTION:
- if (rep->debug > 8)
+ if (rep->debug)
sg_chk_n_print3((wr ? "writing": "reading"), hp, false);
return res;
case SG_LIB_CAT_NOT_READY:
default:
- {
+ rep->out_err = false;
+ if (rep->debug) {
char ebuff[EBUFF_SZ];
snprintf(ebuff, EBUFF_SZ, "%s blk=%" PRId64,
wr ? "writing": "reading", rep->blk);
status = pthread_mutex_lock(a_mutp);
- if (0 != status) err_exit(status, "lock aux_mutex");
+ if (0 != status) err_exit(status, "lock inout_mutex");
sg_chk_n_print3(ebuff, hp, false);
status = pthread_mutex_unlock(a_mutp);
- if (0 != status) err_exit(status, "unlock aux_mutex");
+ if (0 != status) err_exit(status, "unlock inout_mutex");
return res;
}
}
@@ -1268,7 +1298,7 @@ sg_finish_io(bool wr, Rq_elem * rep, pthread_mutex_t * a_mutp)
rep->dio_incomplete_count = 0;
rep->resid = hp->resid;
if (rep->debug > 8)
- pr2serr("sg_finish_io: completed %s\n", wr ? "WRITE" : "READ");
+ pr2serr("%s: completed %s\n", __func__, wr ? "WRITE" : "READ");
return 0;
}
@@ -1351,9 +1381,10 @@ main(int argc, char * argv[])
int res, k, err, keylen;
int64_t in_num_sect = 0;
int64_t out_num_sect = 0;
+ int64_t seek_skip;
int in_sect_sz, out_sect_sz, status, n, flags;
void * vp;
- Rq_coll * clp = &rcoll;
+ struct opts_t * clp = &my_opts;
char ebuff[EBUFF_SZ];
#if SG_LIB_ANDROID
struct sigaction actions;
@@ -1484,6 +1515,9 @@ main(int argc, char * argv[])
do_time = !! sg_get_num(buf);
else if ((keylen > 1) && ('-' == key[0]) && ('-' != key[1])) {
res = 0;
+ n = num_chs_in_str(key + 1, keylen - 1, 'c');
+ clp->chkaddr += n;
+ res += n;
n = num_chs_in_str(key + 1, keylen - 1, 'd');
clp->dry_run += n;
res += n;
@@ -1492,6 +1526,9 @@ main(int argc, char * argv[])
usage();
return 0;
}
+ n = num_chs_in_str(key + 1, keylen - 1, 'p');
+ clp->progress += n;
+ res += n;
n = num_chs_in_str(key + 1, keylen - 1, 'v');
if (n > 0)
verbose_given = true;
@@ -1507,14 +1544,18 @@ main(int argc, char * argv[])
key);
return SG_LIB_SYNTAX_ERROR;
}
- } else if ((0 == strncmp(key, "--dry-run", 9)) ||
+ } else if (0 == strncmp(key, "--chkaddr", 9))
+ ++clp->chkaddr;
+ else if ((0 == strncmp(key, "--dry-run", 9)) ||
(0 == strncmp(key, "--dry_run", 9)))
++clp->dry_run;
else if ((0 == strncmp(key, "--help", 6)) ||
(0 == strcmp(key, "-?"))) {
usage();
return 0;
- } else if (0 == strncmp(key, "--verb", 6)) {
+ } else if (0 == strncmp(key, "--prog", 6))
+ ++clp->progress;
+ else if (0 == strncmp(key, "--verb", 6)) {
verbose_given = true;
++clp->debug; /* --verbose */
} else if (0 == strncmp(key, "--vers", 6))
@@ -1584,7 +1625,7 @@ main(int argc, char * argv[])
usage();
return SG_LIB_SYNTAX_ERROR;
}
- if (clp->debug)
+ if (clp->debug > 2)
pr2serr("%sif=%s skip=%" PRId64 " of=%s seek=%" PRId64 " count=%"
PRId64 "\n", my_name, infn, skip, outfn, seek, dd_count);
@@ -1806,12 +1847,8 @@ main(int argc, char * argv[])
clp->out_rem_count = dd_count;
clp->seek = seek;
clp->out_blk = seek;
- status = pthread_mutex_init(&clp->in_mutex, NULL);
- if (0 != status) err_exit(status, "init in_mutex");
- status = pthread_mutex_init(&clp->out_mutex, NULL);
- if (0 != status) err_exit(status, "init out_mutex");
- status = pthread_mutex_init(&clp->aux_mutex, NULL);
- if (0 != status) err_exit(status, "init aux_mutex");
+ status = pthread_mutex_init(&clp->inout_mutex, NULL);
+ if (0 != status) err_exit(status, "init inout_mutex");
status = pthread_cond_init(&clp->out_sync_cv, NULL);
if (0 != status) err_exit(status, "init out_sync_cv");
@@ -1836,9 +1873,11 @@ main(int argc, char * argv[])
/* vvvvvvvvvvv Start worker threads vvvvvvvvvvvvvvvvvvvvvvvv */
if ((clp->out_rem_count > 0) && (clp->num_threads > 0)) {
/* Run 1 work thread to shake down infant retryable stuff */
- status = pthread_mutex_lock(&clp->out_mutex);
+ status = pthread_mutex_lock(&clp->inout_mutex);
if (0 != status) err_exit(status, "lock out_mutex");
+ seek_skip = clp->seek - clp->skip;
thr_arg_a[0].id = 0;
+ thr_arg_a[0].seek_skip = seek_skip;
thr_arg_a[0].clp = clp;
status = pthread_create(&threads[0], NULL, read_write_thread,
(void *)(thr_arg_a + 0));
@@ -1848,16 +1887,17 @@ main(int argc, char * argv[])
/* wait for any broadcast */
pthread_cleanup_push(cleanup_out, (void *)clp);
- status = pthread_cond_wait(&clp->out_sync_cv, &clp->out_mutex);
+ status = pthread_cond_wait(&clp->out_sync_cv, &clp->inout_mutex);
if (0 != status) err_exit(status, "cond out_sync_cv");
pthread_cleanup_pop(0);
- status = pthread_mutex_unlock(&clp->out_mutex);
+ status = pthread_mutex_unlock(&clp->inout_mutex);
if (0 != status) err_exit(status, "unlock out_mutex");
/* now start the rest of the threads */
for (k = 1; k < clp->num_threads; ++k) {
thr_arg_a[k].id = k;
+ thr_arg_a[k].seek_skip = seek_skip;
thr_arg_a[k].clp = clp;
status = pthread_create(&threads[k], NULL, read_write_thread,
(void *)(thr_arg_a + k));
@@ -1943,5 +1983,17 @@ fini:
if (clp->sum_of_resids)
pr2serr(">> Non-zero sum of residual counts=%d\n",
clp->sum_of_resids);
+#ifdef HAVE_C11_ATOMICS
+ {
+ unsigned int ui;
+
+ if ((ui = atomic_load(&num_eagain)) > 0)
+ pr2serr(">> number of IO call yielding EAGAIN %u\n", ui);
+ if ((ui = atomic_load(&num_ebusy)) > 0)
+ pr2serr(">> number of IO call yielding EBUSY %u\n", ui);
+ if ((ui = atomic_load(&num_eintr)) > 0)
+ pr2serr(">> number of IO call yielding EINTR %u\n", ui);
+ }
+#endif
return (res >= 0) ? res : SG_LIB_CAT_OTHER;
}