diff options
Diffstat (limited to 'test_conformance/run_conformance.py')
-rwxr-xr-x | test_conformance/run_conformance.py | 585 |
1 files changed, 297 insertions, 288 deletions
diff --git a/test_conformance/run_conformance.py b/test_conformance/run_conformance.py index ea7f6775..974491e1 100755 --- a/test_conformance/run_conformance.py +++ b/test_conformance/run_conformance.py @@ -8,295 +8,304 @@ #// #******************************************************************/ -import os, re, sys, subprocess, time, commands, tempfile, math, string +from __future__ import print_function + +import os +import re +import sys +import subprocess +import time +import tempfile DEBUG = 0 -log_file_name = "opencl_conformance_results_" + time.strftime("%Y-%m-%d_%H-%M", time.localtime())+ ".log" +log_file_name = "opencl_conformance_results_" + time.strftime("%Y-%m-%d_%H-%M", time.localtime()) + ".log" process_pid = 0 # The amount of time between printing a "." (if no output from test) or ":" (if output) # to the screen while the tests are running. -seconds_between_status_updates = 60*60*24*7 # effectively never +seconds_between_status_updates = 60 * 60 * 24 * 7 # effectively never # Help info -def write_help_info() : - print("run_conformance.py test_list [CL_DEVICE_TYPE(s) to test] [partial-test-names, ...] [log=path/to/log/file/]") - print(" test_list - the .csv file containing the test names and commands to run the tests.") - print(" [partial-test-names, ...] - optional partial strings to select a subset of the tests to run.") - print(" [CL_DEVICE_TYPE(s) to test] - list of CL device types to test, default is CL_DEVICE_TYPE_DEFAULT.") - print(" [log=path/to/log/file/] - provide a path for the test log file, default is in the current directory.") - print(" (Note: spaces are not allowed in the log file path.") +def write_help_info(): + print("run_conformance.py test_list [CL_DEVICE_TYPE(s) to test] [partial-test-names, ...] [log=path/to/log/file/]") + print(" test_list - the .csv file containing the test names and commands to run the tests.") + print(" [partial-test-names, ...] - optional partial strings to select a subset of the tests to run.") + print(" [CL_DEVICE_TYPE(s) to test] - list of CL device types to test, default is CL_DEVICE_TYPE_DEFAULT.") + print(" [log=path/to/log/file/] - provide a path for the test log file, default is in the current directory.") + print(" (Note: spaces are not allowed in the log file path.") # Get the time formatted nicely -def get_time() : - return time.strftime("%d-%b %H:%M:%S", time.localtime()) +def get_time(): + return time.strftime("%d-%b %H:%M:%S", time.localtime()) + # Write text to the screen and the log file -def write_screen_log(text) : - global log_file - print(text) - log_file.write(text+"\n") +def write_screen_log(text): + global log_file + print(text) + log_file.write(text + "\n") + # Load the tests from a csv formated file of the form name,command def get_tests(filename, devices_to_test): - tests = [] - if (os.path.exists(filename) == False): - print("FAILED: test_list \"" + filename + "\" does not exist.") - print("") - write_help_info() - sys.exit(-1) - file = open(filename, 'r') - for line in file.readlines(): - comment = re.search("^#.*", line) - if (comment): - continue - device_specific_match = re.search("^\s*(.+?)\s*,\s*(.+?)\s*,\s*(.+?)\s*$", line) - if (device_specific_match): - if (device_specific_match.group(1) in devices_to_test): - test_path = string.replace(device_specific_match.group(3), '/', os.sep) - test_name = string.replace(device_specific_match.group(2), '/', os.sep) - tests.append((test_name, test_path)) - else: - print("Skipping " + device_specific_match.group(2) + " because " + device_specific_match.group(1) + " is not in the list of devices to test.") - continue - match = re.search("^\s*(.+?)\s*,\s*(.+?)\s*$", line) - if (match): - test_path = string.replace(match.group(2), '/', os.sep) - test_name = string.replace(match.group(1), '/', os.sep) - tests.append((test_name, test_path)) - return tests + tests = [] + if os.path.exists(filename) == False: + print("FAILED: test_list \"" + filename + "\" does not exist.") + print("") + write_help_info() + sys.exit(-1) + file = open(filename, 'r') + for line in file.readlines(): + comment = re.search("^#.*", line) + if comment: + continue + device_specific_match = re.search("^\s*(.+?)\s*,\s*(.+?)\s*,\s*(.+?)\s*$", line) + if device_specific_match: + if device_specific_match.group(1) in devices_to_test: + test_path = str.replace(device_specific_match.group(3), '/', os.sep) + test_name = str.replace(device_specific_match.group(2), '/', os.sep) + tests.append((test_name, test_path)) + else: + print("Skipping " + device_specific_match.group(2) + " because " + device_specific_match.group(1) + " is not in the list of devices to test.") + continue + match = re.search("^\s*(.+?)\s*,\s*(.+?)\s*$", line) + if match: + test_path = str.replace(match.group(2), '/', os.sep) + test_name = str.replace(match.group(1), '/', os.sep) + tests.append((test_name, test_path)) + return tests def run_test_checking_output(current_directory, test_dir, log_file): - global process_pid, seconds_between_status_updates - failures_this_run = 0 - start_time = time.time() - # Create a temporary file for capturing the output from the test - (output_fd, output_name) = tempfile.mkstemp() - if ( not os.path.exists(output_name)) : - write_screen_log("\n ==> ERROR: could not create temporary file %s ." % output_name) - os.close(output_fd) - return -1 - # Execute the test - program_to_run = test_dir_without_args = test_dir.split(None, 1)[0] - if ( os.sep == '\\' ) : program_to_run += ".exe" - if (os.path.exists(current_directory + os.sep + program_to_run)) : - os.chdir(os.path.dirname(current_directory+os.sep+test_dir_without_args) ) - try: - if (DEBUG): p = subprocess.Popen("", stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True) - else : p = subprocess.Popen(current_directory + os.sep + test_dir, stderr=output_fd, stdout=output_fd, shell=True) - except OSError: - write_screen_log("\n ==> ERROR: failed to execute test. Failing test. : " + str(OSError)) - os.close(output_fd) - return -1 - else: - write_screen_log("\n ==> ERROR: test file (" + current_directory + os.sep + program_to_run +") does not exist. Failing test.") - os.close(output_fd) - return -1 - # Set the global pid so we can kill it if this is aborted - process_pid = p.pid - # Read one character at a time from the temporary output file while the process is running. - # When we get an end-of-line, look for errors and write the results to the log file. - # This allows us to process the file as it is being produced. - # Keep track of the state for reading - # Whether we are done, if we have more to read, and where in the file we last read - done = False - more_to_read = True - pointer = 0 - pointer_at_last_user_update = 0 - output_this_run = False - try: - read_output = open(output_name, 'r') - except IOError: - write_screen_log("\n ==> ERROR: could not open output file from test.") - os.close(output_fd) - return -1 - line = "" - while (not done or more_to_read): - os.fsync(output_fd) - # Determine if we should display some output - elapsed_time = (time.time() - start_time) - if (elapsed_time > seconds_between_status_updates): - start_time = time.time() - # If we've received output from the test since the last update, display a # - if (pointer != pointer_at_last_user_update): - sys.stdout.write(":") - else: - sys.stdout.write(".") - pointer_at_last_user_update = pointer - sys.stdout.flush() - # Check if we're done - p.poll() - if (not done and p.returncode != None): - if (p.returncode < 0): - if (not output_this_run): - print "" - output_this_run = True - write_screen_log(" ==> ERROR: test killed/crashed: " + str(p.returncode)+ ".") - done = True - # Try reading + global process_pid, seconds_between_status_updates + failures_this_run = 0 + start_time = time.time() + # Create a temporary file for capturing the output from the test + (output_fd, output_name) = tempfile.mkstemp() + if not os.path.exists(output_name): + write_screen_log("\n ==> ERROR: could not create temporary file %s ." % output_name) + os.close(output_fd) + return -1 + # Execute the test + program_to_run = test_dir_without_args = test_dir.split(None, 1)[0] + if os.sep == '\\': + program_to_run += ".exe" + if os.path.exists(current_directory + os.sep + program_to_run): + os.chdir(os.path.dirname(current_directory + os.sep + test_dir_without_args)) + try: + if DEBUG: p = subprocess.Popen("", stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True) + else: p = subprocess.Popen(current_directory + os.sep + test_dir, stderr=output_fd, stdout=output_fd, shell=True) + except OSError: + write_screen_log("\n ==> ERROR: failed to execute test. Failing test. : " + str(OSError)) + os.close(output_fd) + return -1 + else: + write_screen_log("\n ==> ERROR: test file (" + current_directory + os.sep + program_to_run + ") does not exist. Failing test.") + os.close(output_fd) + return -1 + # Set the global pid so we can kill it if this is aborted + process_pid = p.pid + # Read one character at a time from the temporary output file while the process is running. + # When we get an end-of-line, look for errors and write the results to the log file. + # This allows us to process the file as it is being produced. + # Keep track of the state for reading + # Whether we are done, if we have more to read, and where in the file we last read + done = False + more_to_read = True + pointer = 0 + pointer_at_last_user_update = 0 + output_this_run = False try: - read_output.seek(pointer) - char_read = read_output.read(1) - except IOError: - time.sleep(1) - continue - # If we got a full line then process it - if (char_read == "\n"): - # Look for failures and report them as such - match = re.search(".*(FAILED|ERROR).*", line) - if (match): - if (not output_this_run): - print "" - output_this_run = True - print(" ==> " + line.replace('\n','')) - match = re.search(".*FAILED.*", line) - if (match): - failures_this_run = failures_this_run + 1 - match = re.search(".*(PASSED).*", line) - if (match): - if (not output_this_run): - print "" - output_this_run = True - print(" " + line.replace('\n','')) - # Write it to the log - log_file.write(" " + line +"\n") - log_file.flush() - line = "" - pointer = pointer + 1 - # If we are at the end of the file, then re-open it to get new data - elif (char_read == ""): - more_to_read = False - read_output.close() - time.sleep(1) - try: - os.fsync(output_fd) read_output = open(output_name, 'r') - # See if there is more to read. This happens if the process ends and we have data left. - read_output.seek(pointer) - if (read_output.read(1) != ""): - more_to_read = True - except IOError: - write_screen_log("\n ==> ERROR: could not reopen output file from test.") + except IOError: + write_screen_log("\n ==> ERROR: could not open output file from test.") + os.close(output_fd) return -1 - done = True - else: - line = line + char_read - pointer = pointer + 1 - # Now we are done, so write out any remaining data in the file: - # This should only happen if the process exited with an error. - os.fsync(output_fd) - while (read_output.read(1) != ""): - log_file.write(read_output.read(1)) - # Return the total number of failures - if (p.returncode == 0 and failures_this_run > 0): - write_screen_log("\n ==> ERROR: Test returned 0, but number of FAILED lines reported is " + str(failures_this_run) +".") - return failures_this_run - return p.returncode - - -def run_tests(tests) : - global curent_directory - global process_pid - # Run the tests - failures = 0 - previous_test = None - test_number = 1 - for test in tests: - # Print the name of the test we're running and the time - (test_name, test_dir) = test - if (test_dir != previous_test): - print("========== " + test_dir) - log_file.write("========================================================================================\n") - log_file.write("========================================================================================\n") - log_file.write("(" + get_time() + ") Running Tests: " + test_dir +"\n") - log_file.write("========================================================================================\n") - log_file.write("========================================================================================\n") - previous_test = test_dir - print("("+get_time()+") BEGIN " + test_name.ljust(40) +": "), - log_file.write(" ----------------------------------------------------------------------------------------\n") - log_file.write(" (" + get_time() + ") Running Sub Test: " + test_name + "\n") - log_file.write(" ----------------------------------------------------------------------------------------\n") - log_file.flush() - sys.stdout.flush() - - # Run the test - result = 0 - start_time = time.time() - try: - process_pid = 0 - result = run_test_checking_output(current_directory, test_dir, log_file) - except KeyboardInterrupt: - # Catch an interrupt from the user - write_screen_log("\nFAILED: Execution interrupted. Killing test process, but not aborting full test run.") - os.kill(process_pid, 9) - answer = raw_input("Abort all tests? (y/n)") - if (answer.find("y") != -1): - write_screen_log("\nUser chose to abort all tests.") - log_file.close() - sys.exit(-1) - else: - write_screen_log("\nUser chose to continue with other tests. Reporting this test as failed.") - result = 1 - run_time = (time.time() - start_time) - - # Move print the finish status - if (result == 0): - print("("+get_time()+") PASSED " + test_name.ljust(40) +": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) +")"), - else: - print("("+get_time()+") FAILED " + test_name.ljust(40) +": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) +")"), - - test_number = test_number + 1 - log_file.write(" ----------------------------------------------------------------------------------------\n") - log_file.flush() - - print("") - if (result != 0): - log_file.write(" *******************************************************************************************\n") - log_file.write(" * ("+get_time()+") Test " + test_name + " ==> FAILED: " + str(result)+"\n") - log_file.write(" *******************************************************************************************\n") - failures = failures + 1 - else: - log_file.write(" ("+get_time()+") Test " + test_name +" passed in " + str(run_time) + "s\n") - - log_file.write(" ----------------------------------------------------------------------------------------\n") - log_file.write("\n") - return failures - - - + line = "" + while not done or more_to_read: + os.fsync(output_fd) + # Determine if we should display some output + elapsed_time = (time.time() - start_time) + if elapsed_time > seconds_between_status_updates: + start_time = time.time() + # If we've received output from the test since the last update, display a # + if pointer != pointer_at_last_user_update: + sys.stdout.write(":") + else: + sys.stdout.write(".") + pointer_at_last_user_update = pointer + sys.stdout.flush() + # Check if we're done + p.poll() + if not done and p.returncode != None: + if p.returncode < 0: + if not output_this_run: + print("") + output_this_run = True + write_screen_log(" ==> ERROR: test killed/crashed: " + str(p.returncode) + ".") + done = True + # Try reading + try: + read_output.seek(pointer) + char_read = read_output.read(1) + except IOError: + time.sleep(1) + continue + # If we got a full line then process it + if char_read == "\n": + # Look for failures and report them as such + match = re.search(".*(FAILED|ERROR).*", line) + if match: + if not output_this_run: + print("") + output_this_run = True + print(" ==> " + line.replace('\n', '')) + match = re.search(".*FAILED.*", line) + if match: + failures_this_run = failures_this_run + 1 + match = re.search(".*(PASSED).*", line) + if match: + if not output_this_run: + print("") + output_this_run = True + print(" " + line.replace('\n', '')) + # Write it to the log + log_file.write(" " + line + "\n") + log_file.flush() + line = "" + pointer = pointer + 1 + # If we are at the end of the file, then re-open it to get new data + elif char_read == "": + more_to_read = False + read_output.close() + time.sleep(1) + try: + os.fsync(output_fd) + read_output = open(output_name, 'r') + # See if there is more to read. This happens if the process ends and we have data left. + read_output.seek(pointer) + if read_output.read(1) != "": + more_to_read = True + except IOError: + write_screen_log("\n ==> ERROR: could not reopen output file from test.") + return -1 + else: + line = line + char_read + pointer = pointer + 1 + # Now we are done, so write out any remaining data in the file: + # This should only happen if the process exited with an error. + os.fsync(output_fd) + while read_output.read(1) != "": + log_file.write(read_output.read(1)) + # Return the total number of failures + if (p.returncode == 0 and failures_this_run > 0): + write_screen_log("\n ==> ERROR: Test returned 0, but number of FAILED lines reported is " + str(failures_this_run) + ".") + return failures_this_run + return p.returncode + + +def run_tests(tests): + global curent_directory + global process_pid + # Run the tests + failures = 0 + previous_test = None + test_number = 1 + for test in tests: + # Print the name of the test we're running and the time + (test_name, test_dir) = test + if test_dir != previous_test: + print("========== " + test_dir) + log_file.write("========================================================================================\n") + log_file.write("========================================================================================\n") + log_file.write("(" + get_time() + ") Running Tests: " + test_dir + "\n") + log_file.write("========================================================================================\n") + log_file.write("========================================================================================\n") + previous_test = test_dir + print("(" + get_time() + ") BEGIN " + test_name.ljust(40) + ": ", end='') + log_file.write(" ----------------------------------------------------------------------------------------\n") + log_file.write(" (" + get_time() + ") Running Sub Test: " + test_name + "\n") + log_file.write(" ----------------------------------------------------------------------------------------\n") + log_file.flush() + sys.stdout.flush() + + # Run the test + result = 0 + start_time = time.time() + try: + process_pid = 0 + result = run_test_checking_output(current_directory, test_dir, log_file) + except KeyboardInterrupt: + # Catch an interrupt from the user + write_screen_log("\nFAILED: Execution interrupted. Killing test process, but not aborting full test run.") + os.kill(process_pid, 9) + if sys.version_info[0] < 3: + answer = raw_input("Abort all tests? (y/n)") + else: + answer = input("Abort all tests? (y/n)") + if answer.find("y") != -1: + write_screen_log("\nUser chose to abort all tests.") + log_file.close() + sys.exit(-1) + else: + write_screen_log("\nUser chose to continue with other tests. Reporting this test as failed.") + result = 1 + run_time = (time.time() - start_time) + + # Move print the finish status + if result == 0: + print("(" + get_time() + ") PASSED " + test_name.ljust(40) + ": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) + ")", end='') + else: + print("(" + get_time() + ") FAILED " + test_name.ljust(40) + ": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) + ")", end='') + + test_number = test_number + 1 + log_file.write(" ----------------------------------------------------------------------------------------\n") + log_file.flush() + + print("") + if result != 0: + log_file.write(" *******************************************************************************************\n") + log_file.write(" * (" + get_time() + ") Test " + test_name + " ==> FAILED: " + str(result) + "\n") + log_file.write(" *******************************************************************************************\n") + failures = failures + 1 + else: + log_file.write(" (" + get_time() + ") Test " + test_name + " passed in " + str(run_time) + "s\n") + + log_file.write(" ----------------------------------------------------------------------------------------\n") + log_file.write("\n") + return failures # ######################## # Begin OpenCL conformance run script # ######################## -if (len(sys.argv) < 2): - write_help_info() - sys.exit(-1) - +if len(sys.argv) < 2: + write_help_info() + sys.exit(-1) current_directory = os.getcwd() # Open the log file for arg in sys.argv: - match = re.search("log=(\S+)", arg) - if (match): - log_file_name = match.group(1).rstrip('/') + os.sep + log_file_name + match = re.search("log=(\S+)", arg) + if match: + log_file_name = match.group(1).rstrip('/') + os.sep + log_file_name try: - log_file = open(log_file_name, "w") + log_file = open(log_file_name, "w") except IOError: - print "Could not open log file " + log_file_name + print("Could not open log file " + log_file_name) + sys.exit(-1) # Determine which devices to test device_types = ["CL_DEVICE_TYPE_DEFAULT", "CL_DEVICE_TYPE_CPU", "CL_DEVICE_TYPE_GPU", "CL_DEVICE_TYPE_ACCELERATOR", "CL_DEVICE_TYPE_ALL"] devices_to_test = [] for device in device_types: - if device in sys.argv[2:]: - devices_to_test.append(device) -if (len(devices_to_test) == 0): - devices_to_test = ["CL_DEVICE_TYPE_DEFAULT"] + if device in sys.argv[2:]: + devices_to_test.append(device) +if len(devices_to_test) == 0: + devices_to_test = ["CL_DEVICE_TYPE_DEFAULT"] write_screen_log("Testing on: " + str(devices_to_test)) # Get the tests @@ -306,52 +315,52 @@ tests = get_tests(sys.argv[1], devices_to_test) tests_to_use = [] num_of_patterns_to_match = 0 for arg in sys.argv[2:]: - if arg in device_types: - continue - if re.search("log=(\S+)", arg): - continue - num_of_patterns_to_match = num_of_patterns_to_match + 1 - found_it = False - for test in tests: - (test_name, test_dir) = test - if (test_name.find(arg) != -1 or test_dir.find(arg) != -1): - found_it = True - if (test not in tests_to_use): - tests_to_use.append(test) - if (found_it == False): - print("Failed to find a test matching " + arg) -if (len(tests_to_use) == 0): - if (num_of_patterns_to_match > 0): - print("FAILED: Failed to find any tests matching the given command-line options.") - print("") - write_help_info() - sys.exit(-1) + if arg in device_types: + continue + if re.search("log=(\S+)", arg): + continue + num_of_patterns_to_match = num_of_patterns_to_match + 1 + found_it = False + for test in tests: + (test_name, test_dir) = test + if (test_name.find(arg) != -1 or test_dir.find(arg) != -1): + found_it = True + if test not in tests_to_use: + tests_to_use.append(test) + if found_it == False: + print("Failed to find a test matching " + arg) +if len(tests_to_use) == 0: + if num_of_patterns_to_match > 0: + print("FAILED: Failed to find any tests matching the given command-line options.") + print("") + write_help_info() + sys.exit(-1) else: - tests = tests_to_use[:] + tests = tests_to_use[:] write_screen_log("Test execution arguments: " + str(sys.argv)) -write_screen_log("Logging to file " + log_file_name +".") +write_screen_log("Logging to file " + log_file_name + ".") write_screen_log("Loaded tests from " + sys.argv[1] + ", total of " + str(len(tests)) + " tests selected to run:") for (test_name, test_command) in tests: - write_screen_log(test_name.ljust(50) + " (" + test_command +")") + write_screen_log(test_name.ljust(50) + " (" + test_command + ")") # Run the tests total_failures = 0 for device_to_test in devices_to_test: - os.environ['CL_DEVICE_TYPE'] = device_to_test - write_screen_log("========================================================================================") - write_screen_log("========================================================================================") - write_screen_log(("Setting CL_DEVICE_TYPE to " + device_to_test).center(90)) - write_screen_log("========================================================================================") - write_screen_log("========================================================================================") - failures = run_tests(tests) - write_screen_log("========================================================================================") - if (failures == 0): - write_screen_log(">> TEST on " + device_to_test + " PASSED") - else: - write_screen_log(">> TEST on " + device_to_test + " FAILED (" + str(failures) + " FAILURES)") - write_screen_log("========================================================================================") - total_failures = total_failures + failures - -write_screen_log("("+get_time()+") Testing complete. " + str(total_failures) + " failures for " + str(len(tests)) + " tests.") + os.environ['CL_DEVICE_TYPE'] = device_to_test + write_screen_log("========================================================================================") + write_screen_log("========================================================================================") + write_screen_log(("Setting CL_DEVICE_TYPE to " + device_to_test).center(90)) + write_screen_log("========================================================================================") + write_screen_log("========================================================================================") + failures = run_tests(tests) + write_screen_log("========================================================================================") + if failures == 0: + write_screen_log(">> TEST on " + device_to_test + " PASSED") + else: + write_screen_log(">> TEST on " + device_to_test + " FAILED (" + str(failures) + " FAILURES)") + write_screen_log("========================================================================================") + total_failures = total_failures + failures + +write_screen_log("(" + get_time() + ") Testing complete. " + str(total_failures) + " failures for " + str(len(tests)) + " tests.") log_file.close() |