diff options
Diffstat (limited to 'testing/sg_mrq_dd.cpp')
-rw-r--r-- | testing/sg_mrq_dd.cpp | 1392 |
1 files changed, 811 insertions, 581 deletions
diff --git a/testing/sg_mrq_dd.cpp b/testing/sg_mrq_dd.cpp index 57382f4e..5f906028 100644 --- a/testing/sg_mrq_dd.cpp +++ b/testing/sg_mrq_dd.cpp @@ -30,7 +30,7 @@ * */ -static const char * version_str = "1.30 20210606"; +static const char * version_str = "1.31 20210620"; #define _XOPEN_SOURCE 600 #ifndef _GNU_SOURCE @@ -138,6 +138,7 @@ using namespace std; #define DEF_SCSI_CDB_SZ 10 #define MAX_SCSI_CDB_SZ 16 /* could be 32 */ #define PACK_ID_TID_MULTIPLIER (0x1000000) /* 16,777,216 */ +#define MAX_SLICES 16 /* number of IFILE,OFILE pairs */ #define SENSE_BUFF_LEN 64 /* Arbitrary, could be larger */ #define READ_CAP_REPLY_LEN 8 @@ -154,22 +155,20 @@ using namespace std; #define MAX_NUM_THREADS 1024 /* was SG_MAX_QUEUE with v3 driver */ #define DEF_MRQ_NUM 16 -#ifndef RAW_MAJOR -#define RAW_MAJOR 255 /*unlikely value */ -#endif - +#define FT_UNKNOWN 0 /* yet to be checked */ #define FT_OTHER 1 /* filetype other than one of the following */ #define FT_SG 2 /* filetype is sg char device */ -#define FT_RAW 4 /* filetype is raw char device */ -#define FT_DEV_NULL 8 /* either "/dev/null" or "." as filename */ -#define FT_ST 16 /* filetype is st char device (tape) */ -#define FT_BLOCK 32 /* filetype is a block device */ -#define FT_FIFO 64 /* fifo (named or unnamed pipe (stdout)) */ +#define FT_DEV_NULL 4 /* either /dev/null, /dev/zero, or "." */ +#define FT_ST 8 /* filetype is st char device (tape) */ +#define FT_BLOCK 16 /* filetype is a block device */ +#define FT_FIFO 32 /* fifo (named or unnamed pipe (stdout)) */ +#define FT_CHAR 64 /* fifo (named or unnamed pipe (stdout)) */ #define FT_RANDOM_0_FF 128 /* iflag=00, iflag=ff and iflag=random override if=IFILE */ #define FT_ERROR 256 /* couldn't "stat" file */ #define DEV_NULL_MINOR_NUM 3 +#define DEV_ZERO_MINOR_NUM 5 #define EBUFF_SZ 768 @@ -199,15 +198,44 @@ struct flags_t { bool qtail; bool random; bool serial; + bool same_fds; bool wq_excl; bool zero; int cdl; /* command duration limits, 0 --> no cdl */ int mmap; }; -typedef pair<int64_t, int> get_next_res; +typedef pair<int64_t, int> get_next_res_t; /* LBA, num */ typedef array<uint8_t, MAX_SCSI_CDB_SZ> cdb_arr_t; +struct cp_ver_pair_t { + cp_ver_pair_t() {} + + get_next_res_t get_next(int desired_num_blks); + + enum class my_state {empty, + init, + underway, + ignore, + finished} state = {my_state::empty}; + + int my_index = 0; + int in_fd = -1; + int in_type = FT_UNKNOWN; + int out_fd = -1; + int out_type = FT_UNKNOWN; + + int64_t dd_count = 0; + atomic<int64_t> next_count_pos {}; + atomic<int64_t> in_rem_count {}; + atomic<int64_t> out_rem_count {}; + atomic<int> in_partial {}; + atomic<int> out_partial {}; + atomic<int> sum_of_resids {}; +}; + +typedef array<cp_ver_pair_t, MAX_SLICES> cp_ver_arr_t; + /* There is one instance of this structure and it is at file scope so it is * initialized to zero. The design of this copy multi-threaded copy algorithm * attempts to have no locks on the fast path. Contention in gcoll.get_next() @@ -216,6 +244,8 @@ typedef array<uint8_t, MAX_SCSI_CDB_SZ> cdb_arr_t; * can occur at that point. */ struct global_collection /* one instance visible to all threads */ { + cp_ver_arr_t cp_ver_arr; + /* get_next() is the pivotal function for multi-threaded safety. It can * be safely called from all threads with the desired number of blocks * (typically mrq*bpt) and this function returns a pair. The first pair @@ -225,24 +255,20 @@ struct global_collection /* one instance visible to all threads */ * returned pair is 0 then the calling thread should shutdown; a * negative value indicates an error has occurred (e.g. in another * thread) and the calling thread should shutdown. */ - get_next_res get_next(int desired_num_blks); - atomic<int64_t> next_count_pos; - int infd; + int in0fd; int64_t dd_count; - int in_type; + int in_type; /* expect all IFILEs to have same type */ int cdbsz_in; int help; struct flags_t in_flags; - atomic<int64_t> in_rem_count; /* | count of remaining in blocks */ atomic<int> in_partial; /* | */ off_t in_st_size; /* Only for FT_OTHER (regular) file */ int mrq_num; /* if user gives 0, set this to 1 */ - int outfd; + int out0fd; int out_type; int cdbsz_out; struct flags_t out_flags; - atomic<int64_t> out_rem_count; /* | count of remaining out blocks */ atomic<int> out_partial; /* | */ off_t out_st_size; /* Only for FT_OTHER (regular) file */ condition_variable infant_cv; /* after thread:0 does first segment */ @@ -274,6 +300,8 @@ struct global_collection /* one instance visible to all threads */ bool unit_nanosec; /* default duration unit is millisecond */ bool verify; /* don't copy, verify like Unix: cmp */ bool prefetch; /* for verify: do PF(b),RD(a),V(b)_a_data */ + vector<string> inf_v; + vector<string> outf_v; const char * infp; const char * outfp; class scat_gath_list i_sgl; @@ -361,10 +389,6 @@ static sigset_t signal_set; static const char * proc_allow_dio = "/proc/scsi/sg/allow_dio"; -static int sg_in_open(struct global_collection *clp, const char *inf, - uint8_t **mmpp, int *mmap_len, bool move_data); -static int sg_out_open(struct global_collection *clp, const char *outf, - uint8_t **mmpp, int *mmap_len, bool move_data); static int do_both_sg_segment(Rq_elem * rep, scat_gath_iter & i_sg_it, scat_gath_iter & o_sg_it, int seg_blks, vector<cdb_arr_t> & a_cdb, @@ -423,6 +447,234 @@ pr2serr_lk(const char * fmt, ...) } static void +usage(int pg_num) +{ + if (pg_num > 4) + goto page5; + if (pg_num > 3) + goto page4; + else if (pg_num > 2) + goto page3; + else if (pg_num > 1) + goto page2; + + pr2serr("Usage: sg_mrq_dd [bs=BS] [conv=CONV] [count=COUNT] [ibs=BS] " + "[if=IFILE*]\n" + " [iflag=FLAGS] [obs=BS] [of=OFILE*] " + "[oflag=FLAGS]\n" + " [seek=SEEK] [skip=SKIP] [--help] [--verify] " + "[--version]\n\n"); + pr2serr(" [bpt=BPT] [cdbsz=6|10|12|16] [cdl=CDL] " + "[dio=0|1]\n" + " [elemsz_kb=EKB] [ese=0|1] [fua=0|1|2|3] " + "[hipri=NRQS]\n" + " [mrq=NRQS] [ofreg=OFREG] [sdt=SDT] " + "[sync=0|1]\n" + " [thr=THR] [time=0|1|2[,TO]] [verbose=VERB] " + "[--dry-run]\n" + " [--pre-fetch] [--verbose] [--version]\n\n" + " where: operands have the form name=value and are pecular to " + "'dd'\n" + " style commands, and options start with one or " + "two hyphens;\n" + " the main operands and options (shown in first group " + "above) are:\n" + " bs must be device logical block size (default " + "512)\n" + " conv comma separated list from: [nocreat,noerror," + "notrunc,\n" + " null,sync]\n" + " count number of blocks to copy (def: device size)\n" + " if file(s) or device(s) to read from (def: " + "stdin)\n" + " iflag comma separated list from: [00,coe,dio," + "direct,dpo,\n" + " dsync,excl,ff,fua,masync,mmap,mout_if,nodur," + "null,\n" + " order,qhead,qtail,random,same_fds,serial," + "wq_excl]\n" + " of file(s) or device(s) to write to (def: " + "/dev/null)\n" + " 'of=.' also outputs to /dev/null\n" + " oflag comma separated list from: [append,nocreat,\n" + " <<list from iflag>>]\n" + " seek block position to start writing to OFILE\n" + " skip block position to start reading from IFILE\n" + " --help|-h output this usage message then exit\n" + " --verify|-x do a verify (compare) operation [def: do a " + "copy]\n" + " --version|-V output version string then exit\n\n" + "Copy IFILE to OFILE, similar to dd command. A comma separated " + "list of files\n may be given for IFILE*, ditto for OFILE*. " + "This utility is specialized for\nSCSI devices and uses the " + "'multiple requests' (mrq) in a single invocation\nfacility in " + "version 4 of the sg driver unless mrq=0. Usually one or both\n" + "IFILE and OFILE will be sg devices. With the --verify option " + "it does a\nverify/compare operation instead of a copy. This " + "utility is Linux specific.\nUse '-hh', '-hhh', '-hhhh' or " + "'-hhhhh' for more information.\n" + ); + return; +page2: + pr2serr("Syntax: sg_mrq_dd [operands] [options]\n\n" + " the lesser used operands and option are:\n\n" + " bpt is blocks_per_transfer (default is 128)\n" + " cdbsz size of SCSI READ, WRITE or VERIFY cdb_s " + "(default is 10)\n" + " cdl command duration limits value 0 to 7 (def: " + "0 (no cdl))\n" + " dio is direct IO, 1->attempt, 0->indirect IO (def)\n" + " elemsz_kb=EKB scatter gather list element size in " + "kibibytes;\n" + " must be power of two, >= page_size " + "(typically 4)\n" + " ese=0|1 exit on secondary error when 1, else continue\n" + " fua force unit access: 0->don't(def), 1->OFILE, " + "2->IFILE,\n" + " 3->OFILE+IFILE\n" + " ibs IFILE logical block size, cannot differ from " + "obs or bs\n" + " hipri similar to mrq=NRQS operand but also sets " + "hipri flag\n" + " mrq NRQS is number of cmds placed in each sg " + "ioctl\n" + " (def: 16). Does not set mrq hipri flag.\n" + " if mrq=0 does one-by-one, blocking " + "ioctl(SG_IO)s\n" + " obs OFILE logical block size, cannot differ from " + "ibs or bs\n" + " ofreg OFREG is regular file or pipe to send what is " + "read from\n" + " IFILE in the first half of each shared element\n" + " sdt stall detection times: CRT[,ICT]. CRT: check " + "repetition\n" + " time (after first) in seconds; ICT: initial " + "check time\n" + " in milliseconds. Default: 3,300 . Use CRT=0 " + "to disable\n" + " sync 0->no sync(def), 1->SYNCHRONIZE CACHE on OFILE " + "after copy\n" + " thr is number of threads, must be > 0, default 4, " + "max 1024\n" + " time 0->no timing; 1/2->millisec/nanosec precision " + "(def: 1);\n" + " TO is command timeout in seconds (def: 60)\n" + " verbose increase verbosity (def: VERB=0)\n" + " --dry-run|-d prepare but bypass copy/read\n" + " --prefetch|-p with verify: do pre-fetch first\n" + " --verbose|-v increase verbosity of utility\n\n" + "Use '-hhh', '-hhhh' or '-hhhhh' for more information about " + "flags.\n" + ); + return; +page3: + pr2serr("Syntax: sg_mrq_dd [operands] [options]\n\n" + " where: 'iflag=<arg>' and 'oflag=<arg>' arguments are listed " + "below:\n\n" + " 00 use all zeros instead of if=IFILE (only in " + "iflag)\n" + " append append output to OFILE (assumes OFILE is " + "regular file)\n" + " coe continue of error (reading, fills with zeros)\n" + " dio sets the SG_FLAG_DIRECT_IO in sg requests\n" + " direct sets the O_DIRECT flag on open()\n" + " dpo sets the DPO (disable page out) in SCSI READs " + "and WRITEs\n" + " dsync sets the O_SYNC flag on open()\n" + " excl sets the O_EXCL flag on open()\n" + " ff use all 0xff bytes instead of if=IFILE (only in " + "iflag)\n" + " fua sets the FUA (force unit access) in SCSI READs " + "and WRITEs\n" + " hipri set HIPRI flag and use blk_poll() for " + "completions\n" + " masync set 'more async' flag on this sg device\n" + " mmap setup mmap IO on IFILE or OFILE\n" + " mmap,mmap when used twice, doesn't call munmap()\n" + " mout_if set META_OUT_IF flag on control object\n" + " nocreat will fail rather than create OFILE\n" + " nodur turns off command duration calculations\n" + " no_thresh skip checking per fd max data xfer size\n" + " order require write ordering on sg->sg copy; only " + "for oflag\n" + " qhead queue new request at head of block queue\n" + " qtail queue new request at tail of block queue (def: " + "q at head)\n" + " random use random data instead of if=IFILE (only in " + "iflag)\n" + " same_fds each thread of a IOFILE pair uses same fds\n" + " serial serialize sg command execution (def: overlap)\n" + " wq_excl set SG_CTL_FLAGM_EXCL_WAITQ on this sg fd\n" + "\n" + "Copies IFILE to OFILE (and to OFILE2 if given). If IFILE and " + "OFILE are sg\ndevices 'shared' mode is selected. " + "When sharing, the data stays in a\nsingle " + "in-kernel buffer which is copied (or mmap-ed) to the user " + "space\nif the 'ofreg=OFREG' is given. Use '-hhhh' or '-hhhhh' " + "for more information.\n" + ); + return; +page4: + pr2serr("pack_id:\n" + "These are ascending integers, starting at 1, associated with " + "each issued\nSCSI command. When both IFILE and OFILE are sg " + "devices, then the READ in\neach read-write pair is issued an " + "even pack_id and its WRITE pair is\ngiven the pack_id one " + "higher (i.e. an odd number). This enables a\n'cat '" + "/proc/scsi/sg/debug' user to see that progress is being " + "made.\n\n"); + pr2serr("Debugging:\n" + "Apart from using one or more '--verbose' options which gets a " + "bit noisy\n'cat /proc/scsi/sg/debug' can give a good overview " + "of what is happening.\nThat does a sg driver object tree " + "traversal that does minimal locking\nto make sure that each " + "traversal is 'safe'. So it is important to note\nthe whole " + "tree is not locked. This means for fast devices the overall\n" + "tree state may change while the traversal is occurring. For " + "example,\nit has been observed that both the read- and write- " + "sides of a request\nshare show they are in 'active' state " + "which should not be possible.\nIt occurs because the read-side " + "probably jumped out of active state and\nthe write-side " + "request entered it while some other nodes were being " + "printed.\n\n"); + pr2serr("Busy state:\n" + "Busy state (abbreviated to 'bsy' in the /proc/scsi/sg/debug " + "output)\nis entered during request setup and completion. It " + "is intended to be\na temporary state. It should not block " + "but does sometimes (e.g. in\nblock_get_request()). Even so " + "that blockage should be short and if not\nthere is a " + "problem.\n\n"); + pr2serr("--verify :\n" + "For comparing IFILE with OFILE. Does repeated sequences of: " + "READ(ifile)\nand uses data returned to send to VERIFY(ofile, " + "BYTCHK=1). So the OFILE\ndevice/disk is doing the actual " + "comparison. Stops on first miscompare\nunless oflag=coe is " + "given\n\n"); + pr2serr("--prefetch :\n" + "Used with --verify option. Prepends a PRE-FETCH(ofile, IMMED) " + "to verify\nsequence. This should speed the trailing VERIFY by " + "making sure that\nthe data it needs for the comparison is " + "already in its cache.\n"); + return; +page5: + pr2serr(" IFILE and/or OFILE lists\n\n" + "For dd, its if= operand takes a single file (or device), ditto " + "for the of=\noperand. This utility extends that to " + "allowing a comma separated list\nof files. Ideally if multiple " + "IFILEs are given, the same number of OFILEs\nshould be given. " + "Simple expansions occur to make the list lengths equal\n" + "(e.g. if 5 IFILEs are given but no OFILEs, then OFILEs is " + "expanded to 5\n'/dev/null' files). IFILE,OFILE pairs with " + "the same list position are\ncalled a 'slice'. Each slice is " + "processed (i.e. copy or verify) in one or\nmore threads. The " + "number of threads must be >= the number of slices. Best\nif " + "the number of threads is an integer multiple of the number of " + "slices.\nThe file type of multiple IFILEs must be the same, " + "ditto for OFILEs.\nSupport for slices is for testing rather " + "than a general mechanism.\n"); +} + +static void lk_print_command_len(const char *prefix, uint8_t * cmdp, int len, bool lock) { if (lock) { @@ -729,7 +981,14 @@ calc_duration_throughput(int contin) } a = res_tm.tv_sec; a += (0.000001 * res_tm.tv_usec); - b = (double)gcoll.bs * (gcoll.dd_count - gcoll.out_rem_count.load()); + + b = 0.0; + for (auto && cvp : gcoll.cp_ver_arr) { + if (cvp.state == cp_ver_pair_t::my_state::empty) + break; + b += (double)(cvp.dd_count - cvp.out_rem_count.load()); + } + b *= (double)gcoll.bs; pr2serr("time to %s data %s %d.%06d secs", (gcoll.verify ? "verify" : "copy"), (contin ? "so far" : "was"), (int)res_tm.tv_sec, (int)res_tm.tv_usec); @@ -742,22 +1001,35 @@ calc_duration_throughput(int contin) static void print_stats(const char * str) { + bool show_slice = (gcoll.cp_ver_arr.size() > 1); + int k = 0; int64_t infull, outfull; - if (0 != gcoll.out_rem_count.load()) - pr2serr(" remaining block count=%" PRId64 "\n", - gcoll.out_rem_count.load()); - infull = gcoll.dd_count - gcoll.in_rem_count.load(); - pr2serr("%s%" PRId64 "+%d records in\n", str, - infull, gcoll.in_partial.load()); - - if (gcoll.out_type == FT_DEV_NULL) - pr2serr("%s0+0 records out\n", str); - else { - outfull = gcoll.dd_count - gcoll.out_rem_count.load(); - pr2serr("%s%" PRId64 "+%d records %s\n", str, - outfull, gcoll.out_partial.load(), - (gcoll.verify ? "verified" : "out")); + for (auto && cvp : gcoll.cp_ver_arr) { + ++k; + if (cvp.state == cp_ver_pair_t::my_state::empty) + break; + if (cvp.state == cp_ver_pair_t::my_state::ignore) { + pr2serr(">>> IGNORING slice: %d\n", k); + continue; + } + if (show_slice) + pr2serr(">>> slice: %d\n", k); + if (0 != cvp.out_rem_count.load()) + pr2serr(" remaining block count=%" PRId64 "\n", + cvp.out_rem_count.load()); + infull = cvp.dd_count - cvp.in_rem_count.load(); + pr2serr("%s%" PRId64 "+%d records in\n", str, + infull, cvp.in_partial.load()); + + if (cvp.out_type == FT_DEV_NULL) + pr2serr("%s0+0 records out\n", str); + else { + outfull = cvp.dd_count - cvp.out_rem_count.load(); + pr2serr("%s%" PRId64 "+%d records %s\n", str, + outfull, cvp.out_partial.load(), + (gcoll.verify ? "verified" : "out")); + } } } @@ -837,14 +1109,14 @@ dd_filetype(const char * filename, off_t & st_size) return FT_ERROR; if (S_ISCHR(st.st_mode)) { if ((MEM_MAJOR == major(st.st_rdev)) && - (DEV_NULL_MINOR_NUM == minor(st.st_rdev))) - return FT_DEV_NULL; - if (RAW_MAJOR == major(st.st_rdev)) - return FT_RAW; + ((DEV_NULL_MINOR_NUM == minor(st.st_rdev)) || + (DEV_ZERO_MINOR_NUM == minor(st.st_rdev)))) + return FT_DEV_NULL; /* treat /dev/null + /dev/zero the same */ if (SCSI_GENERIC_MAJOR == major(st.st_rdev)) return FT_SG; if (SCSI_TAPE_MAJOR == major(st.st_rdev)) return FT_ST; + return FT_CHAR; } else if (S_ISBLK(st.st_mode)) return FT_BLOCK; else if (S_ISFIFO(st.st_mode)) @@ -853,215 +1125,229 @@ dd_filetype(const char * filename, off_t & st_size) return FT_OTHER; } -static void -usage(int pg_num) +/* Returns reserved_buffer_size/mmap_size if success, else 0 for failure */ +static int +sg_prepare_resbuf(int fd, struct global_collection *clp, bool is_in, + uint8_t **mmpp) { - if (pg_num > 3) - goto page4; - else if (pg_num > 2) - goto page3; - else if (pg_num > 1) - goto page2; + static bool done = false; + bool no_dur = is_in ? clp->in_flags.no_dur : clp->out_flags.no_dur; + bool masync = is_in ? clp->in_flags.masync : clp->out_flags.masync; + bool wq_excl = is_in ? clp->in_flags.wq_excl : clp->out_flags.wq_excl; + bool skip_thresh = is_in ? clp->in_flags.no_thresh : + clp->out_flags.no_thresh; + int elem_sz = clp->elem_sz; + int res, t, num, err; + uint8_t *mmp; + struct sg_extended_info sei {}; + struct sg_extended_info * seip = &sei; - pr2serr("Usage: sg_mrq_dd [bs=BS] [conv=CONV] [count=COUNT] [ibs=BS] " - "[if=IFILE]\n" - " [iflag=FLAGS] [obs=BS] [of=OFILE] " - "[oflag=FLAGS]\n" - " [seek=SEEK] [skip=SKIP] [--help] [--verify] " - "[--version]\n\n"); - pr2serr(" [bpt=BPT] [cdbsz=6|10|12|16] [cdl=CDL] " - "[dio=0|1]\n" - " [elemsz_kb=EKB] [ese=0|1] [fua=0|1|2|3] " - "[hipri=NRQS]\n" - " [mrq=NRQS] [ofreg=OFREG] [sdt=SDT] " - "[sync=0|1]\n" - " [thr=THR] [time=0|1|2[,TO]] [verbose=VERB] " - "[--dry-run]\n" - " [--pre-fetch] [--verbose] [--version]\n\n" - " where: operands have the form name=value and are pecular to " - "'dd'\n" - " style commands, and options start with one or " - "two hyphens;\n" - " the main operands and options (shown in first group " - "above) are:\n" - " bs must be device logical block size (default " - "512)\n" - " conv comma separated list from: [nocreat,noerror," - "notrunc,\n" - " null,sync]\n" - " count number of blocks to copy (def: device size)\n" - " if file or device to read from (def: stdin)\n" - " iflag comma separated list from: [00,coe,dio," - "direct,dpo,\n" - " dsync,excl,ff,fua,masync,mmap,mout_if,nodur," - "null,\n" - " order,qhead,qtail,random,serial,wq_excl]\n" - " of file or device to write to (def: /dev/null " - "N.B. different\n" - " from dd it defaults to stdout). If 'of=.' " - "uses /dev/null\n" - " oflag comma separated list from: [append,nocreat,\n" - " <<list from iflag>>]\n" - " seek block position to start writing to OFILE\n" - " skip block position to start reading from IFILE\n" - " --help|-h output this usage message then exit\n" - " --verify|-x do a verify (compare) operation [def: do a " - "copy]\n" - " --version|-V output version string then exit\n\n" - "Copy IFILE to OFILE, similar to dd command. This utility is " - "specialized for\nSCSI devices and uses the 'multiple requests' " - "(mrq) in a single invocation\nfacility in version 4 of the sg " - "driver unless mrq=0. Usually one or both\nIFILE and OFILE will " - "be sg devices. With the --verify option it does a\n" - "verify/compare operation instead of a copy. This utility is " - "Linux specific.\nUse '-hh', '-hhh' or '-hhhh' for more " - "information.\n" - ); - return; -page2: - pr2serr("Syntax: sg_mrq_dd [operands] [options]\n\n" - " the lesser used operands and option are:\n\n" - " bpt is blocks_per_transfer (default is 128)\n" - " cdbsz size of SCSI READ, WRITE or VERIFY cdb_s " - "(default is 10)\n" - " cdl command duration limits value 0 to 7 (def: " - "0 (no cdl))\n" - " dio is direct IO, 1->attempt, 0->indirect IO (def)\n" - " elemsz_kb=EKB scatter gather list element size in " - "kibibytes;\n" - " must be power of two, >= page_size " - "(typically 4)\n" - " ese=0|1 exit on secondary error when 1, else continue\n" - " fua force unit access: 0->don't(def), 1->OFILE, " - "2->IFILE,\n" - " 3->OFILE+IFILE\n" - " ibs IFILE logical block size, cannot differ from " - "obs or bs\n" - " hipri similar to mrq=NRQS operand but also sets " - "hipri flag\n" - " mrq NRQS is number of cmds placed in each sg " - "ioctl\n" - " (def: 16). Does not set mrq hipri flag.\n" - " if mrq=0 does one-by-one, blocking " - "ioctl(SG_IO)s\n" - " obs OFILE logical block size, cannot differ from " - "ibs or bs\n" - " ofreg OFREG is regular file or pipe to send what is " - "read from\n" - " IFILE in the first half of each shared element\n" - " sdt stall detection times: CRT[,ICT]. CRT: check " - "repetition\n" - " time (after first) in seconds; ICT: initial " - "check time\n" - " in milliseconds. Default: 3,300 . Use CRT=0 " - "to disable\n" - " sync 0->no sync(def), 1->SYNCHRONIZE CACHE on OFILE " - "after copy\n" - " thr is number of threads, must be > 0, default 4, " - "max 1024\n" - " time 0->no timing; 1/2->millisec/nanosec precision " - "(def: 1);\n" - " TO is command timeout in seconds (def: 60)\n" - " verbose increase verbosity (def: VERB=0)\n" - " --dry-run|-d prepare but bypass copy/read\n" - " --prefetch|-p with verify: do pre-fetch first\n" - " --verbose|-v increase verbosity of utility\n\n" - "Use '-hhh' or '-hhhh' for more information about flags.\n" - ); - return; -page3: - pr2serr("Syntax: sg_mrq_dd [operands] [options]\n\n" - " where: 'iflag=<arg>' and 'oflag=<arg>' arguments are listed " - "below:\n\n" - " 00 use all zeros instead of if=IFILE (only in " - "iflag)\n" - " append append output to OFILE (assumes OFILE is " - "regular file)\n" - " coe continue of error (reading, fills with zeros)\n" - " dio sets the SG_FLAG_DIRECT_IO in sg requests\n" - " direct sets the O_DIRECT flag on open()\n" - " dpo sets the DPO (disable page out) in SCSI READs " - "and WRITEs\n" - " dsync sets the O_SYNC flag on open()\n" - " excl sets the O_EXCL flag on open()\n" - " ff use all 0xff bytes instead of if=IFILE (only in " - "iflag)\n" - " fua sets the FUA (force unit access) in SCSI READs " - "and WRITEs\n" - " hipri set HIPRI flag and use blk_poll() for " - "completions\n" - " masync set 'more async' flag on this sg device\n" - " mmap setup mmap IO on IFILE or OFILE\n" - " mmap,mmap when used twice, doesn't call munmap()\n" - " mout_if set META_OUT_IF flag on control object\n" - " nocreat will fail rather than create OFILE\n" - " nodur turns off command duration calculations\n" - " no_thresh skip checking per fd max data xfer size\n" - " order require write ordering on sg->sg copy; only " - "for oflag\n" - " qhead queue new request at head of block queue\n" - " qtail queue new request at tail of block queue (def: " - "q at head)\n" - " random use random data instead of if=IFILE (only in " - "iflag)\n" - " serial serialize sg command execution (def: overlap)\n" - " wq_excl set SG_CTL_FLAGM_EXCL_WAITQ on this sg fd\n" - "\n" - "Copies IFILE to OFILE (and to OFILE2 if given). If IFILE and " - "OFILE are sg\ndevices 'shared' mode is selected. " - "When sharing, the data stays in a\nsingle " - "in-kernel buffer which is copied (or mmap-ed) to the user " - "space\nif the 'ofreg=OFREG' is given. Use '-hhhh' for more " - "information.\n" - ); - return; -page4: - pr2serr("pack_id:\n" - "These are ascending integers, starting at 1, associated with " - "each issued\nSCSI command. When both IFILE and OFILE are sg " - "devices, then the READ in\neach read-write pair is issued an " - "even pack_id and its WRITE pair is\ngiven the pack_id one " - "higher (i.e. an odd number). This enables a\n'cat '" - "/proc/scsi/sg/debug' user to see that progress is being " - "made.\n\n"); - pr2serr("Debugging:\n" - "Apart from using one or more '--verbose' options which gets a " - "bit noisy\n'cat /proc/scsi/sg/debug' can give a good overview " - "of what is happening.\nThat does a sg driver object tree " - "traversal that does minimal locking\nto make sure that each " - "traversal is 'safe'. So it is important to note\nthe whole " - "tree is not locked. This means for fast devices the overall\n" - "tree state may change while the traversal is occurring. For " - "example,\nit has been observed that both the read- and write- " - "sides of a request\nshare show they are in 'active' state " - "which should not be possible.\nIt occurs because the read-side " - "probably jumped out of active state and\nthe write-side " - "request entered it while some other nodes were being " - "printed.\n\n"); - pr2serr("Busy state:\n" - "Busy state (abbreviated to 'bsy' in the /proc/scsi/sg/debug " - "output)\nis entered during request setup and completion. It " - "is intended to be\na temporary state. It should not block " - "but does sometimes (e.g. in\nblock_get_request()). Even so " - "that blockage should be short and if not\nthere is a " - "problem.\n\n"); - pr2serr("--verify :\n" - "For comparing IFILE with OFILE. Does repeated sequences of: " - "READ(ifile)\nand uses data returned to send to VERIFY(ofile, " - "BYTCHK=1). So the OFILE\ndevice/disk is doing the actual " - "comparison. Stops on first miscompare\nunless oflag=coe is " - "given\n\n"); - pr2serr("--prefetch :\n" - "Used with --verify option. Prepends a PRE-FETCH(ofile, IMMED) " - "to verify\nsequence. This should speed the trailing VERIFY by " - "making sure that\nthe data it needs for the comparison is " - "already in its cache.\n"); - return; + res = ioctl(fd, SG_GET_VERSION_NUM, &t); + if ((res < 0) || (t < 40000)) { + if (ioctl(fd, SG_GET_RESERVED_SIZE, &num) < 0) { + perror("SG_GET_RESERVED_SIZE ioctl failed"); + return 0; + } + if (! done) { + done = true; + pr2serr_lk("%ssg driver prior to 4.0.00, reduced functionality\n", + my_name); + } + goto bypass; + } + if (elem_sz >= 4096) { + seip->sei_rd_mask |= SG_SEIM_SGAT_ELEM_SZ; + res = ioctl(fd, SG_SET_GET_EXTENDED, seip); + if (res < 0) + pr2serr_lk("sg_mrq_dd: %s: SG_SET_GET_EXTENDED(SGAT_ELEM_SZ) rd " + "error: %s\n", __func__, strerror(errno)); + if (elem_sz != (int)seip->sgat_elem_sz) { + seip->sei_wr_mask |= SG_SEIM_SGAT_ELEM_SZ; + seip->sgat_elem_sz = elem_sz; + res = ioctl(fd, SG_SET_GET_EXTENDED, seip); + if (res < 0) + pr2serr_lk("sg_mrq_dd: %s: SG_SET_GET_EXTENDED(SGAT_ELEM_SZ) " + "wr error: %s\n", __func__, strerror(errno)); + } + } + if (no_dur || masync || skip_thresh) { + seip->sei_wr_mask |= SG_SEIM_CTL_FLAGS; + if (no_dur) { + seip->ctl_flags_wr_mask |= SG_CTL_FLAGM_NO_DURATION; + seip->ctl_flags |= SG_CTL_FLAGM_NO_DURATION; + } + if (masync) { + seip->ctl_flags_wr_mask |= SG_CTL_FLAGM_MORE_ASYNC; + seip->ctl_flags |= SG_CTL_FLAGM_MORE_ASYNC; + } + if (wq_excl) { + seip->ctl_flags_wr_mask |= SG_CTL_FLAGM_EXCL_WAITQ; + seip->ctl_flags |= SG_CTL_FLAGM_EXCL_WAITQ; + } + if (skip_thresh) { + seip->tot_fd_thresh = 0; + sei.sei_wr_mask |= SG_SEIM_TOT_FD_THRESH; + } + res = ioctl(fd, SG_SET_GET_EXTENDED, seip); + if (res < 0) + pr2serr_lk("sg_mrq_dd: %s: SG_SET_GET_EXTENDED(NO_DURATION) " + "error: %s\n", __func__, strerror(errno)); + } +bypass: + num = clp->bs * clp->bpt; + res = ioctl(fd, SG_SET_RESERVED_SIZE, &num); + if (res < 0) { + perror("sg_mrq_dd: SG_SET_RESERVED_SIZE error"); + return 0; + } else { + int nn; + + res = ioctl(fd, SG_GET_RESERVED_SIZE, &nn); + if (res < 0) { + perror("sg_mrq_dd: SG_GET_RESERVED_SIZE error"); + return 0; + } + if (nn < num) { + pr2serr_lk("%s: SG_GET_RESERVED_SIZE shows size truncated, " + "wanted %d got %d\n", __func__, num, nn); + return 0; + } + if (mmpp) { + mmp = (uint8_t *)mmap(NULL, num, PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); + if (MAP_FAILED == mmp) { + err = errno; + pr2serr_lk("sg_mrq_dd: %s: sz=%d, fd=%d, mmap() failed: %s\n", + __func__, num, fd, strerror(err)); + return 0; + } + *mmpp = mmp; + } + } + t = 1; + res = ioctl(fd, SG_SET_FORCE_PACK_ID, &t); + if (res < 0) + perror("sg_mrq_dd: SG_SET_FORCE_PACK_ID error"); + if (clp->unit_nanosec) { + seip->sei_wr_mask |= SG_SEIM_CTL_FLAGS; + seip->ctl_flags_wr_mask |= SG_CTL_FLAGM_TIME_IN_NS; + seip->ctl_flags |= SG_CTL_FLAGM_TIME_IN_NS; + if (ioctl(fd, SG_SET_GET_EXTENDED, seip) < 0) { + res = -1; + pr2serr_lk("ioctl(EXTENDED(TIME_IN_NS)) failed, errno=%d %s\n", + errno, strerror(errno)); + } + } + if (clp->verbose) { + t = 1; + /* more info in /proc/scsi/sg/debug */ + res = ioctl(fd, SG_SET_DEBUG, &t); + if (res < 0) + perror("sg_mrq_dd: SG_SET_DEBUG error"); + } + return (res < 0) ? 0 : num; +} + +static int +sg_in_open(struct global_collection *clp, const string & inf, uint8_t **mmpp, + int * mmap_lenp) +{ + int fd, err, n; + int flags = O_RDWR; + char ebuff[EBUFF_SZ]; + const char * fnp = inf.data(); + + if (clp->in_flags.direct) + flags |= O_DIRECT; + if (clp->in_flags.excl) + flags |= O_EXCL; + if (clp->in_flags.dsync) + flags |= O_SYNC; + + if ((fd = open(fnp, flags)) < 0) { + err = errno; + snprintf(ebuff, EBUFF_SZ, "%s: could not open %s for sg reading", + __func__, fnp); + perror(ebuff); + return -sg_convert_errno(err); + } + n = sg_prepare_resbuf(fd, clp, true, mmpp); + if (n <= 0) + return -SG_LIB_FILE_ERROR; + if (mmap_lenp) + *mmap_lenp = n; + return fd; +} + +static int +sg_out_open(struct global_collection *clp, const string & outf, + uint8_t **mmpp, int * mmap_lenp) +{ + int fd, err, n; + int flags = O_RDWR; + char ebuff[EBUFF_SZ]; + const char * fnp = outf.data(); + + if (clp->out_flags.direct) + flags |= O_DIRECT; + if (clp->out_flags.excl) + flags |= O_EXCL; + if (clp->out_flags.dsync) + flags |= O_SYNC; + + if ((fd = open(fnp, flags)) < 0) { + err = errno; + snprintf(ebuff, EBUFF_SZ, "%s: could not open %s for sg %s", + __func__, fnp, (clp->verify ? "verifying" : "writing")); + perror(ebuff); + return -sg_convert_errno(err); + } + n = sg_prepare_resbuf(fd, clp, false, mmpp); + if (n <= 0) + return -SG_LIB_FILE_ERROR; + if (mmap_lenp) + *mmap_lenp = n; + return fd; } +static int +reg_file_open(struct global_collection *clp, const string & fn_s, + bool for_wr) +{ + int fd, flags; + char ebuff[EBUFF_SZ]; -get_next_res -global_collection::get_next(int desired_num_blks) + if (for_wr) { + flags = O_WRONLY; + if (! clp->out_flags.nocreat) + flags |= O_CREAT; + if (clp->out_flags.append) + flags |= O_APPEND; + } else + flags = O_RDONLY; + if (clp->in_flags.direct) + flags |= O_DIRECT; + if (clp->in_flags.excl) + flags |= O_EXCL; + if (clp->in_flags.dsync) + flags |= O_SYNC; + + if (for_wr) + fd = open(fn_s.data(), flags, 0666); + else + fd = open(fn_s.data(), flags); + if (fd < 0) { + int err = errno; + snprintf(ebuff, EBUFF_SZ, "%scould not open %s for %sing ", + my_name, fn_s.data(), (for_wr ? "writ" : "read")); + perror(ebuff); + return -err; + } + return fd; +} + +get_next_res_t +cp_ver_pair_t::get_next(int desired_num_blks) { int64_t expected, desired; @@ -1151,6 +1437,16 @@ read_blkdev_capacity(int sg_fd, int64_t * num_sect, int * sect_sz) #endif } +static void +flag_all_stop(struct global_collection * clp) +{ + for (auto && elem : clp->cp_ver_arr) { + if (elem.state == cp_ver_pair_t::my_state::empty) + break; + elem.next_count_pos.store(-1); + } +} + /* Has an infinite loop doing a timed wait for any signals in sig_set. After * each timeout (300 ms) checks if the most_recent_pack_id atomic integer * has changed. If not after another two timeouts announces a stall has @@ -1197,7 +1493,7 @@ sig_listen_thread(struct global_collection * clp) } if (SIGINT == sig_number) { pr2serr_lk("%sinterrupted by SIGINT\n", my_name); - clp->next_count_pos.store(-1); + flag_all_stop(clp); shutting_down.store(true); } } /* end of while loop */ @@ -1249,7 +1545,8 @@ sg_take_snap(int sg_fd, int id, bool vb_b) // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< /* Each thread's "main" function */ static void -read_write_thread(struct global_collection * clp, int id, bool singleton) +read_write_thread(struct global_collection * clp, int thr_idx, int slice_idx, + bool singleton) { Rq_elem rel {}; Rq_elem * rep = &rel; @@ -1260,8 +1557,11 @@ read_write_thread(struct global_collection * clp, int id, bool singleton) bool in_is_sg, in_mmap, out_is_sg, out_mmap; bool own_outfd = false; bool only_one_sg = false; + struct cp_ver_pair_t & cvp = clp->cp_ver_arr[slice_idx]; class scat_gath_iter i_sg_it(clp->i_sgl); class scat_gath_iter o_sg_it(clp->o_sgl); + const string & inf = clp->inf_v[slice_idx]; + const string & outf = clp->outf_v[slice_idx]; vector<cdb_arr_t> a_cdb; vector<struct sg_io_v4> a_v4; @@ -1272,7 +1572,7 @@ read_write_thread(struct global_collection * clp, int id, bool singleton) out_is_sg = (FT_SG == clp->out_type); out_mmap = (out_is_sg && (clp->out_flags.mmap > 0)); rep->clp = clp; - rep->id = id; + rep->id = thr_idx; if (in_is_sg && out_is_sg) rep->both_sg = true; @@ -1284,8 +1584,12 @@ read_write_thread(struct global_collection * clp, int id, bool singleton) rep->only_out_sg = true; } - if (vb > 2) - pr2serr_lk("%d <-- Starting worker thread\n", id); + if (vb > 2) { + pr2serr_lk("%d <-- Starting worker thread, slice=%d\n", thr_idx, + slice_idx); + if (vb > 3) + pr2serr_lk(" %s ---> %s\n", inf.data(), outf.data()); + } if (! (rep->both_sg || in_mmap)) { rep->buffp = sg_memalign(sz, 0 /* page align */, &rep->alloc_bp, false); @@ -1294,12 +1598,32 @@ read_write_thread(struct global_collection * clp, int id, bool singleton) return; } } - rep->infd = clp->infd; - rep->outfd = clp->outfd; + rep->infd = clp->in0fd; + rep->outfd = clp->out0fd; rep->outregfd = clp->outregfd; rep->rep_count = 0; rep->in_follow_on = -1; rep->out_follow_on = -1; + if (cvp.state == cp_ver_pair_t::my_state::init) + cvp.state = cp_ver_pair_t::my_state::underway; + if (FT_OTHER == cvp.in_type) { + fd = reg_file_open(clp, inf, false); + if (fd < 0) { + pr2serr_lk("[%d]: unable to open IFILE of slice=%d\n", thr_idx, + slice_idx); + return; + } + rep->infd = fd; + } + if (FT_OTHER == cvp.out_type) { + fd = reg_file_open(clp, outf, true); + if (fd < 0) { + pr2serr_lk("[%d]: unable to open OFILE of slice=%d\n", thr_idx, + slice_idx); + return; + } + rep->outfd = fd; + } if (rep->infd == rep->outfd) { if (in_is_sg) @@ -1310,80 +1634,100 @@ read_write_thread(struct global_collection * clp, int id, bool singleton) ssize_t ssz = getrandom(&rep->seed, sizeof(rep->seed), GRND_NONBLOCK); if (ssz < (ssize_t)sizeof(rep->seed)) { - pr2serr_lk("[%d] %s: getrandom() failed, ret=%d\n", id, __func__, - (int)ssz); + pr2serr_lk("[%d] %s: getrandom() failed, ret=%d\n", thr_idx, + __func__, (int)ssz); rep->seed = (long)time(NULL); } #else rep->seed = (long)time(NULL); /* use seconds since epoch as proxy */ #endif if (vb > 1) - pr2serr_lk("[%d] %s: seed=%ld\n", id, __func__, rep->seed); + pr2serr_lk("[%d] %s: seed=%ld\n", thr_idx, __func__, rep->seed); srand48_r(rep->seed, &rep->drand); } - if (in_is_sg && clp->infp) { - fd = sg_in_open(clp, clp->infp, (in_mmap ? &rep->buffp : NULL), - (in_mmap ? &rep->mmap_len : NULL), true); - if (fd < 0) - goto fini; + if (in_is_sg && inf.size()) { + if ((clp->in_flags.same_fds || (0 == thr_idx)) && + (cvp.in_fd >= 0)) + fd = cvp.in_fd; + else { + fd = sg_in_open(clp, inf, (in_mmap ? &rep->buffp : NULL), + (in_mmap ? &rep->mmap_len : NULL)); + if (fd < 0) + goto fini; + own_infd = true; + if (cvp.in_fd < 0) + cvp.in_fd = fd; + } rep->infd = fd; rep->mmap_active = in_mmap ? clp->in_flags.mmap : 0; if (in_mmap && (vb > 4)) - pr2serr_lk("[%d] %s: mmap buffp=%p\n", id, __func__, rep->buffp); - own_infd = true; + pr2serr_lk("[%d] %s: mmap buffp=%p\n", thr_idx, __func__, + rep->buffp); ++num_sg; if (vb > 2) - pr2serr_lk("[%d]: opened local sg IFILE\n", id); + pr2serr_lk("[%d]: opened local sg IFILE\n", thr_idx); } - if (out_is_sg && clp->outfp) { - fd = sg_out_open(clp, clp->outfp, (out_mmap ? &rep->buffp : NULL), - (out_mmap ? &rep->mmap_len : NULL), true); - if (fd < 0) - goto fini; + if (out_is_sg && outf.size()) { + if ((clp->out_flags.same_fds || (0 == thr_idx)) && + (cvp.out_fd >= 0)) + fd = cvp.out_fd; + else { + fd = sg_out_open(clp, outf, (out_mmap ? &rep->buffp : NULL), + (out_mmap ? &rep->mmap_len : NULL)); + if (fd < 0) + goto fini; + own_outfd = true; + if (cvp.out_fd < 0) + cvp.out_fd = fd; + } rep->outfd = fd; if (! rep->mmap_active) rep->mmap_active = out_mmap ? clp->out_flags.mmap : 0; if (out_mmap && (vb > 4)) - pr2serr_lk("[%d]: mmap buffp=%p\n", id, rep->buffp); - own_outfd = true; + pr2serr_lk("[%d]: mmap buffp=%p\n", thr_idx, rep->buffp); ++num_sg; if (vb > 2) - pr2serr_lk("[%d]: opened local sg OFILE\n", id); + pr2serr_lk("[%d]: opened local sg OFILE\n", thr_idx); } if (vb > 2) { if (in_is_sg && (! own_infd)) - pr2serr_lk("[%d]: using global sg IFILE, fd=%d\n", id, rep->infd); + pr2serr_lk("[%d]: using global sg IFILE, fd=%d\n", thr_idx, + rep->infd); if (out_is_sg && (! own_outfd)) - pr2serr_lk("[%d]: using global sg OFILE, fd=%d\n", id, rep->outfd); + pr2serr_lk("[%d]: using global sg OFILE, fd=%d\n", thr_idx, + rep->outfd); } if (rep->both_sg) - rep->has_share = sg_share_prepare(rep->outfd, rep->infd, id, vb > 9); + rep->has_share = sg_share_prepare(rep->outfd, rep->infd, thr_idx, + vb > 9); if (vb > 9) - pr2serr_lk("[%d]: has_share=%s\n", id, + pr2serr_lk("[%d]: has_share=%s\n", thr_idx, (rep->has_share ? "true" : "false")); // share_and_ofreg = (rep->has_share && (rep->outregfd >= 0)); /* vvvvvvvvvvvvvv Main segment copy loop vvvvvvvvvvvvvvvvvvvvvvv */ while (! shutting_down) { - get_next_res gnr = clp->get_next(clp->mrq_num * clp->bpt); + get_next_res_t gnr = cvp.get_next(clp->mrq_num * clp->bpt); seg_blks = gnr.second; if (seg_blks <= 0) { if (seg_blks < 0) res = -seg_blks; + else + cvp.state = cp_ver_pair_t::my_state::finished; break; } if (! i_sg_it.set_by_blk_idx(gnr.first)) { lock_guard<mutex> lk(strerr_mut); - pr2serr_lk("[%d]: input set_by_blk_idx() failed\n", id); + pr2serr_lk("[%d]: input set_by_blk_idx() failed\n", thr_idx); i_sg_it.dbg_print("input after set_by_blk_idx", false, vb > 5); res = 2; break; } if (! o_sg_it.set_by_blk_idx(gnr.first)) { - pr2serr_lk("[%d]: output set_by_blk_idx() failed\n", id); + pr2serr_lk("[%d]: output set_by_blk_idx() failed\n", thr_idx); res = 3; break; } @@ -1433,8 +1777,11 @@ read_write_thread(struct global_collection * clp, int id, bool singleton) } } /* ^^^^^^^^^^ end of main while loop which copies segments ^^^^^^ */ - if (shutting_down) + if (shutting_down) { + if (vb > 3) + pr2serr_lk("%s: t=%d: shutting down\n", __func__, rep->id); goto fini; + } if (singleton) { { lock_guard<mutex> lk(clp->infant_mut); @@ -1446,7 +1793,7 @@ read_write_thread(struct global_collection * clp, int id, bool singleton) } if (res < 0) { if (seg_blks >= 0) - clp->get_next(-1); /* flag error to main */ + cvp.get_next(-1); /* flag error to main */ pr2serr_lk("%s: t=%d: aborting, res=%d\n", __func__, rep->id, res); } @@ -1495,11 +1842,15 @@ fini: close(rep->outfd); } /* pass stats back to read-side */ - clp->in_rem_count -= rep->in_local_count; - clp->out_rem_count -= rep->out_local_count; - clp->in_partial += rep->in_local_partial; - clp->out_partial += rep->out_local_partial; - clp->sum_of_resids += rep->in_resid_bytes; + if (vb > 3) + pr2serr_lk("%s: [%d] leaving: in/out local count=%" PRId64 "/%" + PRId64 "\n", __func__, rep->id, rep->in_local_count, + rep->out_local_count); + cvp.in_rem_count -= rep->in_local_count; + cvp.out_rem_count -= rep->out_local_count; + cvp.in_partial += rep->in_local_partial; + cvp.out_partial += rep->out_local_partial; + cvp.sum_of_resids += rep->in_resid_bytes; if (rep->alloc_bp) free(rep->alloc_bp); } @@ -1555,7 +1906,7 @@ normal_in_rd(Rq_elem * rep, int64_t lba, int blocks, int d_boff) } } bp = rep->buffp + d_boff; - while (((res = read(clp->infd, bp, blocks * clp->bs)) < 0) && + while (((res = read(rep->infd, bp, blocks * clp->bs)) < 0) && ((EINTR == errno) || (EAGAIN == errno))) std::this_thread::yield();/* another thread may be able to progress */ if (res < 0) { @@ -1612,7 +1963,7 @@ normal_out_wr(Rq_elem * rep, int64_t lba, int blocks, int d_boff) rep->out_follow_on = pos; } } - while (((res = write(clp->outfd, bp, blocks * clp->bs)) + while (((res = write(rep->outfd, bp, blocks * clp->bs)) < 0) && ((EINTR == errno) || (EAGAIN == errno))) std::this_thread::yield();/* another thread may be able to progress */ if (res < 0) { @@ -1653,7 +2004,7 @@ extra_out_wr(Rq_elem * rep, int num_bytes, int d_boff) pr2serr_lk("[%d] %s: num_bytes=%d, d_boff=%d\n", id, __func__, num_bytes, d_boff); - while (((res = write(clp->outfd, bp, num_bytes)) + while (((res = write(clp->out0fd, bp, num_bytes)) < 0) && ((EINTR == errno) || (EAGAIN == errno))) std::this_thread::yield();/* another thread may be able to progress */ if (res < 0) { @@ -2897,129 +3248,6 @@ sg_blk_poll(int fd, int num) } #endif -/* Returns reserved_buffer_size/mmap_size if success, else 0 for failure */ -static int -sg_prepare_resbuf(int fd, struct global_collection *clp, bool is_in, - uint8_t **mmpp) -{ - static bool done = false; - bool no_dur = is_in ? clp->in_flags.no_dur : clp->out_flags.no_dur; - bool masync = is_in ? clp->in_flags.masync : clp->out_flags.masync; - bool wq_excl = is_in ? clp->in_flags.wq_excl : clp->out_flags.wq_excl; - bool skip_thresh = is_in ? clp->in_flags.no_thresh : - clp->out_flags.no_thresh; - int elem_sz = clp->elem_sz; - int res, t, num, err; - uint8_t *mmp; - struct sg_extended_info sei {}; - struct sg_extended_info * seip = &sei; - - res = ioctl(fd, SG_GET_VERSION_NUM, &t); - if ((res < 0) || (t < 40000)) { - if (ioctl(fd, SG_GET_RESERVED_SIZE, &num) < 0) { - perror("SG_GET_RESERVED_SIZE ioctl failed"); - return 0; - } - if (! done) { - done = true; - pr2serr_lk("%ssg driver prior to 4.0.00, reduced functionality\n", - my_name); - } - goto bypass; - } - if (elem_sz >= 4096) { - seip->sei_rd_mask |= SG_SEIM_SGAT_ELEM_SZ; - res = ioctl(fd, SG_SET_GET_EXTENDED, seip); - if (res < 0) - pr2serr_lk("sg_mrq_dd: %s: SG_SET_GET_EXTENDED(SGAT_ELEM_SZ) rd " - "error: %s\n", __func__, strerror(errno)); - if (elem_sz != (int)seip->sgat_elem_sz) { - seip->sei_wr_mask |= SG_SEIM_SGAT_ELEM_SZ; - seip->sgat_elem_sz = elem_sz; - res = ioctl(fd, SG_SET_GET_EXTENDED, seip); - if (res < 0) - pr2serr_lk("sg_mrq_dd: %s: SG_SET_GET_EXTENDED(SGAT_ELEM_SZ) " - "wr error: %s\n", __func__, strerror(errno)); - } - } - if (no_dur || masync || skip_thresh) { - seip->sei_wr_mask |= SG_SEIM_CTL_FLAGS; - if (no_dur) { - seip->ctl_flags_wr_mask |= SG_CTL_FLAGM_NO_DURATION; - seip->ctl_flags |= SG_CTL_FLAGM_NO_DURATION; - } - if (masync) { - seip->ctl_flags_wr_mask |= SG_CTL_FLAGM_MORE_ASYNC; - seip->ctl_flags |= SG_CTL_FLAGM_MORE_ASYNC; - } - if (wq_excl) { - seip->ctl_flags_wr_mask |= SG_CTL_FLAGM_EXCL_WAITQ; - seip->ctl_flags |= SG_CTL_FLAGM_EXCL_WAITQ; - } - if (skip_thresh) { - seip->tot_fd_thresh = 0; - sei.sei_wr_mask |= SG_SEIM_TOT_FD_THRESH; - } - res = ioctl(fd, SG_SET_GET_EXTENDED, seip); - if (res < 0) - pr2serr_lk("sg_mrq_dd: %s: SG_SET_GET_EXTENDED(NO_DURATION) " - "error: %s\n", __func__, strerror(errno)); - } -bypass: - num = clp->bs * clp->bpt; - res = ioctl(fd, SG_SET_RESERVED_SIZE, &num); - if (res < 0) { - perror("sg_mrq_dd: SG_SET_RESERVED_SIZE error"); - return 0; - } else { - int nn; - - res = ioctl(fd, SG_GET_RESERVED_SIZE, &nn); - if (res < 0) { - perror("sg_mrq_dd: SG_GET_RESERVED_SIZE error"); - return 0; - } - if (nn < num) { - pr2serr_lk("%s: SG_GET_RESERVED_SIZE shows size truncated, " - "wanted %d got %d\n", __func__, num, nn); - return 0; - } - if (mmpp) { - mmp = (uint8_t *)mmap(NULL, num, PROT_READ | PROT_WRITE, - MAP_SHARED, fd, 0); - if (MAP_FAILED == mmp) { - err = errno; - pr2serr_lk("sg_mrq_dd: %s: sz=%d, fd=%d, mmap() failed: %s\n", - __func__, num, fd, strerror(err)); - return 0; - } - *mmpp = mmp; - } - } - t = 1; - res = ioctl(fd, SG_SET_FORCE_PACK_ID, &t); - if (res < 0) - perror("sg_mrq_dd: SG_SET_FORCE_PACK_ID error"); - if (clp->unit_nanosec) { - seip->sei_wr_mask |= SG_SEIM_CTL_FLAGS; - seip->ctl_flags_wr_mask |= SG_CTL_FLAGM_TIME_IN_NS; - seip->ctl_flags |= SG_CTL_FLAGM_TIME_IN_NS; - if (ioctl(fd, SG_SET_GET_EXTENDED, seip) < 0) { - res = -1; - pr2serr_lk("ioctl(EXTENDED(TIME_IN_NS)) failed, errno=%d %s\n", - errno, strerror(errno)); - } - } - if (clp->verbose) { - t = 1; - /* more info in /proc/scsi/sg/debug */ - res = ioctl(fd, SG_SET_DEBUG, &t); - if (res < 0) - perror("sg_mrq_dd: SG_SET_DEBUG error"); - } - return (res < 0) ? 0 : num; -} - /* Returns the number of times 'ch' is found in string 's' given the * string's length. */ static int @@ -3173,6 +3401,9 @@ process_flags(const char * arg, struct flags_t * fp) fp->random = true; else if ((0 == strcmp(cp, "mout_if")) || (0 == strcmp(cp, "mout-if"))) fp->mout_if = true; + else if ((0 == strcmp(cp, "same_fds")) || + (0 == strcmp(cp, "same-fds"))) + fp->same_fds = true; else if (0 == strcmp(cp, "serial")) fp->serial = true; else if (0 == strcmp(cp, "swait")) @@ -3188,72 +3419,6 @@ process_flags(const char * arg, struct flags_t * fp) return true; } -static int -sg_in_open(struct global_collection *clp, const char *inf, uint8_t **mmpp, - int * mmap_lenp, bool move_data) -{ - int fd, err, n; - int flags = O_RDWR; - char ebuff[EBUFF_SZ]; - - if (clp->in_flags.direct) - flags |= O_DIRECT; - if (clp->in_flags.excl) - flags |= O_EXCL; - if (clp->in_flags.dsync) - flags |= O_SYNC; - - if ((fd = open(inf, flags)) < 0) { - err = errno; - snprintf(ebuff, EBUFF_SZ, "%s: could not open %s for sg reading", - __func__, inf); - perror(ebuff); - return -sg_convert_errno(err); - } - if (move_data) { - n = sg_prepare_resbuf(fd, clp, true, mmpp); - if (n <= 0) - return -SG_LIB_FILE_ERROR; - } else - n = 0; - if (mmap_lenp) - *mmap_lenp = n; - return fd; -} - -static int -sg_out_open(struct global_collection *clp, const char *outf, uint8_t **mmpp, - int * mmap_lenp, bool move_data) -{ - int fd, err, n; - int flags = O_RDWR; - char ebuff[EBUFF_SZ]; - - if (clp->out_flags.direct) - flags |= O_DIRECT; - if (clp->out_flags.excl) - flags |= O_EXCL; - if (clp->out_flags.dsync) - flags |= O_SYNC; - - if ((fd = open(outf, flags)) < 0) { - err = errno; - snprintf(ebuff, EBUFF_SZ, "%s: could not open %s for sg %s", - __func__, outf, (clp->verify ? "verifying" : "writing")); - perror(ebuff); - return -sg_convert_errno(err); - } - if (move_data) { - n = sg_prepare_resbuf(fd, clp, false, mmpp); - if (n <= 0) - return -SG_LIB_FILE_ERROR; - } else - n = 0; - if (mmap_lenp) - *mmap_lenp = n; - return fd; -} - /* Process arguments given to 'conv=" option. Returns 0 on success, * 1 on error. */ static int @@ -3299,7 +3464,7 @@ process_conv(const char * arg, struct flags_t * ifp, struct flags_t * ofp) static int parse_cmdline_sanity(int argc, char * argv[], struct global_collection * clp, - char * inf, char * outf, char * outregf) + char * outregf) { bool contra = false; bool verbose_given = false; @@ -3438,12 +3603,16 @@ parse_cmdline_sanity(int argc, char * argv[], struct global_collection * clp, goto syn_err; } } else if (0 == strcmp(key, "if")) { - if ('\0' != inf[0]) { + if (clp->inf_v.size() > 0) { pr2serr("Second 'if=' argument??\n"); goto syn_err; } else { - memcpy(inf, buf, INOUTF_SZ); - inf[INOUTF_SZ - 1] = '\0'; /* noisy compiler */ + cp = buf; + while ((ccp = strchr(cp, ','))) { + clp->inf_v.push_back(string(cp , ccp - cp)); + cp = ccp + 1; + } + clp->inf_v.push_back(string(cp , strlen(cp))); } } else if (0 == strcmp(key, "iflag")) { if (! process_flags(buf, &clp->in_flags)) { @@ -3497,12 +3666,16 @@ parse_cmdline_sanity(int argc, char * argv[], struct global_collection * clp, outregf[INOUTF_SZ - 1] = '\0'; /* noisy compiler */ } } else if (strcmp(key, "of") == 0) { - if ('\0' != outf[0]) { + if (clp->outf_v.size() > 0) { pr2serr("Second 'of=' argument??\n"); goto syn_err; } else { - memcpy(outf, buf, INOUTF_SZ); - outf[INOUTF_SZ - 1] = '\0'; /* noisy compiler */ + cp = buf; + while ((ccp = strchr(cp, ','))) { + clp->outf_v.push_back(string(cp , ccp - cp)); + cp = ccp + 1; + } + clp->outf_v.push_back(string(cp , strlen(cp))); } } else if (0 == strcmp(key, "oflag")) { if (! process_flags(buf, &clp->out_flags)) { @@ -3728,10 +3901,10 @@ calc_count(struct global_collection * clp, const char * inf, out_num_sect = -1; } if (FT_SG == clp->in_type) { - res = scsi_read_capacity(clp->infd, &in_num_sect, &in_sect_sz); + res = scsi_read_capacity(clp->in0fd, &in_num_sect, &in_sect_sz); if (2 == res) { pr2serr("Unit attention, media changed(in), continuing\n"); - res = scsi_read_capacity(clp->infd, &in_num_sect, + res = scsi_read_capacity(clp->in0fd, &in_num_sect, &in_sect_sz); } if (0 != res) { @@ -3750,10 +3923,10 @@ calc_count(struct global_collection * clp, const char * inf, } } if (FT_SG == clp->out_type) { - res = scsi_read_capacity(clp->outfd, &out_num_sect, &out_sect_sz); + res = scsi_read_capacity(clp->out0fd, &out_num_sect, &out_sect_sz); if (2 == res) { pr2serr("Unit attention, media changed(out), continuing\n"); - res = scsi_read_capacity(clp->outfd, &out_num_sect, + res = scsi_read_capacity(clp->out0fd, &out_num_sect, &out_sect_sz); } if (0 != res) { @@ -3777,7 +3950,7 @@ calc_count(struct global_collection * clp, const char * inf, if (FT_SG == clp->in_type) ; else if (FT_BLOCK == clp->in_type) { - if (0 != read_blkdev_capacity(clp->infd, &in_num_sect, + if (0 != read_blkdev_capacity(clp->in0fd, &in_num_sect, &in_sect_sz)) { pr2serr("Unable to read block capacity on %s\n", inf); in_num_sect = -1; @@ -3792,7 +3965,7 @@ calc_count(struct global_collection * clp, const char * inf, if (FT_SG == clp->out_type) ; else if (FT_BLOCK == clp->out_type) { - if (0 != read_blkdev_capacity(clp->outfd, &out_num_sect, + if (0 != read_blkdev_capacity(clp->out0fd, &out_num_sect, &out_sect_sz)) { pr2serr("Unable to read block capacity on %s\n", outf); out_num_sect = -1; @@ -3943,10 +4116,12 @@ int main(int argc, char * argv[]) { bool fail_after_cli = false; - char inf[INOUTF_SZ]; - char outf[INOUTF_SZ]; + bool ifile_given = true; + // char inf[INOUTF_SZ]; + // char outf[INOUTF_SZ]; char outregf[INOUTF_SZ]; - int res, k, err, flags; + int res, k, err; + size_t num_ifiles, num_ofiles, num_slices, inf0_sz; int64_t in_num_sect = -1; int64_t out_num_sect = -1; const char * ccp = NULL; @@ -3978,8 +4153,8 @@ main(int argc, char * argv[]) clp->cdbsz_in = DEF_SCSI_CDB_SZ; clp->cdbsz_out = DEF_SCSI_CDB_SZ; clp->mrq_num = DEF_MRQ_NUM; - inf[0] = '\0'; - outf[0] = '\0'; + // inf[0] = '\0'; + // outf[0] = '\0'; outregf[0] = '\0'; fetch_sg_version(); if (sg_version >= 40045) @@ -3990,7 +4165,7 @@ main(int argc, char * argv[]) fail_after_cli = true; } - res = parse_cmdline_sanity(argc, argv, clp, inf, outf, outregf); + res = parse_cmdline_sanity(argc, argv, clp, outregf); if (SG_LIB_OK_FALSE == res) return 0; if (res) @@ -4007,8 +4182,63 @@ main(int argc, char * argv[]) install_handler(SIGUSR1, siginfo_handler); install_handler(SIGUSR2, siginfo2_handler); - clp->infd = STDIN_FILENO; - clp->outfd = STDOUT_FILENO; + num_ifiles = clp->inf_v.size(); + num_ofiles = clp->outf_v.size(); + if (num_ifiles > MAX_SLICES) { + pr2serr("%sonly support %d slices but given %zd IFILEs\n", my_name, + MAX_SLICES, num_ifiles); + return SG_LIB_SYNTAX_ERROR; + } + if (num_ofiles > MAX_SLICES) { + pr2serr("%sonly support %d slices but given %zd OFILEs\n", my_name, + MAX_SLICES, num_ifiles); + return SG_LIB_SYNTAX_ERROR; + } + if (0 == num_ofiles) { + if (0 == num_ifiles) { + pr2serr("%sexpect either if= or of= to be given\n", my_name); + return SG_LIB_SYNTAX_ERROR; + } + for (k = 0; k < (int)num_ifiles; ++k) + clp->outf_v.push_back("."); /* same as /dev/null */ + } + if (0 == num_ifiles) { + ifile_given = false; + for (k = 0; k < (int)num_ofiles; ++k) + clp->inf_v.push_back(""); + } + if ((num_ifiles > 1) && (num_ofiles > 1) && (num_ifiles != num_ofiles)) { + pr2serr("%snumber of IFILEs [%zd] and number of OFILEs [%zd] > 1 " + "and unequal\n", my_name, num_ifiles, num_ofiles); + return SG_LIB_SYNTAX_ERROR; + } + if ((num_ifiles > 1) && (1 == num_ofiles)) { + /* if many IFILEs and one OFILE, replicate OFILE till same size */ + for (k = 1; k < (int)num_ifiles; ++k) + clp->outf_v.push_back(clp->outf_v[0]); + num_ofiles = clp->outf_v.size(); + } else if ((num_ofiles > 1) && (1 == num_ifiles)) { + /* if many OFILEs and one IFILE, replicate IFILE till same size */ + for (k = 1; k < (int)num_ofiles; ++k) + clp->inf_v.push_back(clp->inf_v[0]); + num_ifiles = clp->inf_v.size(); + } + num_slices = (num_ifiles > num_ofiles) ? num_ifiles : num_ofiles; + if ((int)num_slices > num_threads) { + pr2serr("%sNumber of slices [%zd] exceeds number of threads [%d].\n", + my_name, num_slices, num_threads); + pr2serr("Number of threads needs to be increased.\n"); + return SG_LIB_SYNTAX_ERROR; + } + k = 0; + for (auto && cvp : clp->cp_ver_arr) { + if (k >= (int)num_slices) + break; + cvp.my_index = k++; + cvp.state = cp_ver_pair_t::my_state::init; + } + clp->in0fd = STDIN_FILENO; + clp->out0fd = STDOUT_FILENO; if (clp->in_flags.ff) { ccp = "<0xff bytes>"; cc2p = "ff"; @@ -4019,45 +4249,45 @@ main(int argc, char * argv[]) ccp = "<zero bytes>"; cc2p = "00"; } + inf0_sz = clp->inf_v.size() ? clp->inf_v[0].size() : 0; if (ccp) { - if (inf[0]) { - pr2serr("%siflag=%s and if=%s contradict\n", my_name, cc2p, inf); + if (ifile_given) { + pr2serr("%siflag=%s and if=%s contradict\n", my_name, cc2p, + clp->inf_v[0].data()); return SG_LIB_CONTRADICT; } + for (auto && cvp : clp->cp_ver_arr) { + if (cvp.state == cp_ver_pair_t::my_state::empty) + break; + cvp.in_type = FT_RANDOM_0_FF; + } clp->in_type = FT_RANDOM_0_FF; clp->infp = ccp; - clp->infd = -1; - } else if (inf[0] && ('-' != inf[0])) { - clp->in_type = dd_filetype(inf, clp->in_st_size); + clp->in0fd = -1; + } else if (inf0_sz && ('-' != clp->inf_v[0].data()[0])) { + const string & inf_s = clp->inf_v[0]; + const char * infp = inf_s.data(); + clp->in_type = dd_filetype(infp, clp->in_st_size); if (FT_ERROR == clp->in_type) { - pr2serr("%sunable to access %s\n", my_name, inf); + pr2serr("%sunable to access %s\n", my_name, infp); return SG_LIB_FILE_ERROR; } else if (FT_ST == clp->in_type) { - pr2serr("%sunable to use scsi tape device %s\n", my_name, inf); + pr2serr("%sunable to use scsi tape device %s\n", my_name, infp); + return SG_LIB_FILE_ERROR; + } else if (FT_CHAR == clp->in_type) { + pr2serr("%sunable to use unknown char device %s\n", my_name, infp); return SG_LIB_FILE_ERROR; } else if (FT_SG == clp->in_type) { - clp->infd = sg_in_open(clp, inf, NULL, NULL, false); - if (clp->infd < 0) - return -clp->infd; + clp->in0fd = sg_in_open(clp, inf_s, NULL, NULL); + if (clp->in0fd < 0) + return -clp->in0fd; } else { - flags = O_RDONLY; - if (clp->in_flags.direct) - flags |= O_DIRECT; - if (clp->in_flags.excl) - flags |= O_EXCL; - if (clp->in_flags.dsync) - flags |= O_SYNC; - - if ((clp->infd = open(inf, flags)) < 0) { - err = errno; - snprintf(ebuff, EBUFF_SZ, "%scould not open %s for reading", - my_name, inf); - perror(ebuff); - return sg_convert_errno(err); - } + clp->in0fd = reg_file_open(clp, infp, false /* read */); + if (clp->in0fd < 0) + return sg_convert_errno(-clp->in0fd); } - clp->infp = inf; + clp->infp = infp; } if (clp->cdl_given && (! clp->cdbsz_given)) { bool changed = false; @@ -4077,12 +4307,15 @@ main(int argc, char * argv[]) if ((clp->verbose > 0) && (clp->in_flags.no_waitq || clp->out_flags.no_waitq)) pr2serr("no_waitq=<n> operand is now ignored\n"); - if (outf[0]) { + if (clp->outf_v.size()) { + const string & outf_s = clp->outf_v[0].data(); + const char * outfp = outf_s.data(); + clp->ofile_given = true; - if (('-' == outf[0])) + if ('-' == outfp[0]) clp->out_type = FT_FIFO; else - clp->out_type = dd_filetype(outf, clp->out_st_size); + clp->out_type = dd_filetype(outfp, clp->out_st_size); if ((FT_SG != clp->out_type) && clp->verify) { pr2serr("%s --verify only supported by sg OFILEs\n", my_name); @@ -4091,47 +4324,24 @@ main(int argc, char * argv[]) if (FT_FIFO == clp->out_type) ; else if (FT_ST == clp->out_type) { - pr2serr("%sunable to use scsi tape device %s\n", my_name, outf); + pr2serr("%sunable to use scsi tape device %s\n", my_name, outfp); + return SG_LIB_FILE_ERROR; + } else if (FT_CHAR == clp->out_type) { + pr2serr("%sunable to use unknown char device %s\n", my_name, + outfp); return SG_LIB_FILE_ERROR; } else if (FT_SG == clp->out_type) { - clp->outfd = sg_out_open(clp, outf, NULL, NULL, false); - if (clp->outfd < 0) - return -clp->outfd; + clp->out0fd = sg_out_open(clp, outf_s, NULL, NULL); + if (clp->out0fd < 0) + return -clp->out0fd; } else if (FT_DEV_NULL == clp->out_type) - clp->outfd = -1; /* don't bother opening */ + clp->out0fd = -1; /* don't bother opening */ else { - if (FT_RAW != clp->out_type) { - flags = O_WRONLY; - if (! clp->out_flags.nocreat) - flags |= O_CREAT; - if (clp->out_flags.direct) - flags |= O_DIRECT; - if (clp->out_flags.excl) - flags |= O_EXCL; - if (clp->out_flags.dsync) - flags |= O_SYNC; - if (clp->out_flags.append) - flags |= O_APPEND; - - if ((clp->outfd = open(outf, flags, 0666)) < 0) { - err = errno; - snprintf(ebuff, EBUFF_SZ, "%scould not open %s for " - "writing", my_name, outf); - perror(ebuff); - return sg_convert_errno(err); - } - } - else { /* raw output file */ - if ((clp->outfd = open(outf, O_WRONLY)) < 0) { - err = errno; - snprintf(ebuff, EBUFF_SZ, "%scould not open %s for raw " - "writing", my_name, outf); - perror(ebuff); - return sg_convert_errno(err); - } - } + clp->out0fd = reg_file_open(clp, outfp, true /* write */); + if (clp->out0fd < 0) + return sg_convert_errno(-clp->out0fd); } - clp->outfp = outf; + clp->outfp = outfp; } if (clp->verify && (clp->out_type == FT_DEV_NULL)) { pr2serr("Can't do verify when OFILE not given\n"); @@ -4172,7 +4382,7 @@ main(int argc, char * argv[]) } else clp->outregfd = -1; - if ((STDIN_FILENO == clp->infd) && (STDOUT_FILENO == clp->outfd)) { + if ((STDIN_FILENO == clp->in0fd) && (STDOUT_FILENO == clp->out0fd)) { pr2serr("Won't default both IFILE to stdin _and_ OFILE to " "/dev/null\n"); pr2serr("For more information use '--help'\n"); @@ -4186,7 +4396,8 @@ main(int argc, char * argv[]) pr2serr("The seek= argument is not suitable for a pipe\n"); return SG_LIB_SYNTAX_ERROR; } - res = do_count_work(clp, inf, in_num_sect, outf, out_num_sect); + res = do_count_work(clp, clp->inf_v[0].data(), in_num_sect, + clp->outf_v[0].data(), out_num_sect); if (res) return res; @@ -4213,8 +4424,13 @@ main(int argc, char * argv[]) } } - clp->in_rem_count = clp->dd_count; - clp->out_rem_count = clp->dd_count; + for (auto && cvp : clp->cp_ver_arr) { + cvp.in_type = clp->in_type; + cvp.out_type = clp->out_type; + cvp.dd_count = clp->dd_count; + cvp.in_rem_count = clp->dd_count; + cvp.out_rem_count = clp->dd_count; + } if (clp->dry_run > 0) { pr2serr("Due to --dry-run option, bypass copy/read\n"); @@ -4243,13 +4459,18 @@ main(int argc, char * argv[]) /* vvvvvvvvvvv Start worker threads vvvvvvvvvvvvvvvvvvvvvvvv */ if (num_threads > 0) { + auto & cvp = clp->cp_ver_arr[0]; + + cvp.in_fd = clp->in0fd; + cvp.out_fd = clp->out0fd; + /* launch "infant" thread to catch early mortality, if any */ - work_thr.emplace_back(read_write_thread, clp, 0, true); + work_thr.emplace_back(read_write_thread, clp, 0, 0, true); { unique_lock<mutex> lk(clp->infant_mut); clp->infant_cv.wait(lk, []{ return gcoll.processed; }); } - if (clp->next_count_pos.load() < 0) { + if (clp->cp_ver_arr[0].next_count_pos.load() < 0) { /* infant thread error-ed out, join with it */ for (auto & t : work_thr) { if (t.joinable()) @@ -4260,7 +4481,8 @@ main(int argc, char * argv[]) /* now start the rest of the threads */ for (k = 1; k < num_threads; ++k) - work_thr.emplace_back(read_write_thread, clp, k, false); + work_thr.emplace_back(read_write_thread, clp, k, + k % (int)num_slices, false); /* now wait for worker threads to finish */ for (auto & t : work_thr) { @@ -4274,11 +4496,12 @@ jump: if (do_sync) { if (FT_SG == clp->out_type) { - pr2serr_lk(">> Synchronizing cache on %s\n", outf); - res = sg_ll_sync_cache_10(clp->outfd, 0, 0, 0, 0, 0, false, 0); + pr2serr_lk(">> Synchronizing cache on %s\n", + (clp->outf_v.size() ? clp->outf_v[0].data() : "" )); + res = sg_ll_sync_cache_10(clp->out0fd, 0, 0, 0, 0, 0, false, 0); if (SG_LIB_CAT_UNIT_ATTENTION == res) { pr2serr_lk("Unit attention(out), continuing\n"); - res = sg_ll_sync_cache_10(clp->outfd, 0, 0, 0, 0, 0, false, + res = sg_ll_sync_cache_10(clp->out0fd, 0, 0, 0, 0, 0, false, 0); } if (0 != res) @@ -4296,11 +4519,11 @@ jump: fini: - if ((STDIN_FILENO != clp->infd) && (clp->infd >= 0)) - close(clp->infd); - if ((STDOUT_FILENO != clp->outfd) && (FT_DEV_NULL != clp->out_type) && - (clp->outfd >= 0)) - close(clp->outfd); + if ((STDIN_FILENO != clp->in0fd) && (clp->in0fd >= 0)) + close(clp->in0fd); + if ((STDOUT_FILENO != clp->out0fd) && (FT_DEV_NULL != clp->out_type) && + (clp->out0fd >= 0)) + close(clp->out0fd); if ((clp->outregfd >= 0) && (STDOUT_FILENO != clp->outregfd) && (FT_DEV_NULL != clp->outreg_type)) close(clp->outregfd); @@ -4320,9 +4543,16 @@ fini: close(fd); } } - if (clp->sum_of_resids.load()) - pr2serr(">> Non-zero sum of residual counts=%d\n", - clp->sum_of_resids.load()); + + k = 0; + for (auto && cvp : gcoll.cp_ver_arr) { + if (cvp.state == cp_ver_pair_t::my_state::empty) + break; + ++k; + if (cvp.sum_of_resids.load()) + pr2serr(">> slice: %d, Non-zero sum of residual counts=%d\n", + k, cvp.sum_of_resids.load()); + } if (clp->verbose && (num_start_eagain > 0)) pr2serr("Number of start EAGAINs: %d\n", num_start_eagain.load()); if (clp->verbose && (num_fin_eagain > 0)) |