aboutsummaryrefslogtreecommitdiff
path: root/test_conformance/run_conformance.py
blob: 974491e1cfdbad437ad9c40ccc072091db088c31 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
#! /usr/bin/python

#/******************************************************************
#//
#//  OpenCL Conformance Tests
#//
#//  Copyright:  (c) 2008-2009 by Apple Inc. All Rights Reserved.
#//
#******************************************************************/

from __future__ import print_function

import os
import re
import sys
import subprocess
import time
import tempfile

DEBUG = 0

log_file_name = "opencl_conformance_results_" + time.strftime("%Y-%m-%d_%H-%M", time.localtime()) + ".log"
process_pid = 0

# The amount of time between printing a "." (if no output from test) or ":" (if output)
#  to the screen while the tests are running.
seconds_between_status_updates = 60 * 60 * 24 * 7  # effectively never

# Help info
def write_help_info():
    print("run_conformance.py test_list [CL_DEVICE_TYPE(s) to test] [partial-test-names, ...] [log=path/to/log/file/]")
    print(" test_list - the .csv file containing the test names and commands to run the tests.")
    print(" [partial-test-names, ...] - optional partial strings to select a subset of the tests to run.")
    print(" [CL_DEVICE_TYPE(s) to test] - list of CL device types to test, default is CL_DEVICE_TYPE_DEFAULT.")
    print(" [log=path/to/log/file/] - provide a path for the test log file, default is in the current directory.")
    print("   (Note: spaces are not allowed in the log file path.")


# Get the time formatted nicely
def get_time():
    return time.strftime("%d-%b %H:%M:%S", time.localtime())


# Write text to the screen and the log file
def write_screen_log(text):
    global log_file
    print(text)
    log_file.write(text + "\n")


# Load the tests from a csv formated file of the form name,command
def get_tests(filename, devices_to_test):
    tests = []
    if os.path.exists(filename) == False:
        print("FAILED: test_list \"" + filename + "\" does not exist.")
        print("")
        write_help_info()
        sys.exit(-1)
    file = open(filename, 'r')
    for line in file.readlines():
        comment = re.search("^#.*", line)
        if comment:
            continue
        device_specific_match = re.search("^\s*(.+?)\s*,\s*(.+?)\s*,\s*(.+?)\s*$", line)
        if device_specific_match:
            if device_specific_match.group(1) in devices_to_test:
                test_path = str.replace(device_specific_match.group(3), '/', os.sep)
                test_name = str.replace(device_specific_match.group(2), '/', os.sep)
                tests.append((test_name, test_path))
            else:
                print("Skipping " + device_specific_match.group(2) + " because " + device_specific_match.group(1) + " is not in the list of devices to test.")
            continue
        match = re.search("^\s*(.+?)\s*,\s*(.+?)\s*$", line)
        if match:
            test_path = str.replace(match.group(2), '/', os.sep)
            test_name = str.replace(match.group(1), '/', os.sep)
            tests.append((test_name, test_path))
    return tests


def run_test_checking_output(current_directory, test_dir, log_file):
    global process_pid, seconds_between_status_updates
    failures_this_run = 0
    start_time = time.time()
    # Create a temporary file for capturing the output from the test
    (output_fd, output_name) = tempfile.mkstemp()
    if not os.path.exists(output_name):
        write_screen_log("\n           ==> ERROR: could not create temporary file %s ." % output_name)
        os.close(output_fd)
        return -1
    # Execute the test
    program_to_run = test_dir_without_args = test_dir.split(None, 1)[0]
    if os.sep == '\\':
        program_to_run += ".exe"
    if os.path.exists(current_directory + os.sep + program_to_run):
        os.chdir(os.path.dirname(current_directory + os.sep + test_dir_without_args))
        try:
            if DEBUG: p = subprocess.Popen("", stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)
            else: p = subprocess.Popen(current_directory + os.sep + test_dir, stderr=output_fd, stdout=output_fd, shell=True)
        except OSError:
            write_screen_log("\n           ==> ERROR: failed to execute test. Failing test. : " + str(OSError))
            os.close(output_fd)
            return -1
    else:
        write_screen_log("\n           ==> ERROR: test file (" + current_directory + os.sep + program_to_run + ") does not exist.  Failing test.")
        os.close(output_fd)
        return -1
    # Set the global pid so we can kill it if this is aborted
    process_pid = p.pid
    # Read one character at a time from the temporary output file while the process is running.
    # When we get an end-of-line, look for errors and write the results to the log file.
    # This allows us to process the file as it is being produced.
    # Keep track of the state for reading
    # Whether we are done, if we have more to read, and where in the file we last read
    done = False
    more_to_read = True
    pointer = 0
    pointer_at_last_user_update = 0
    output_this_run = False
    try:
        read_output = open(output_name, 'r')
    except IOError:
        write_screen_log("\n           ==> ERROR: could not open output file from test.")
        os.close(output_fd)
        return -1
    line = ""
    while not done or more_to_read:
        os.fsync(output_fd)
        # Determine if we should display some output
        elapsed_time = (time.time() - start_time)
        if elapsed_time > seconds_between_status_updates:
            start_time = time.time()
            # If we've received output from the test since the last update, display a #
            if pointer != pointer_at_last_user_update:
                sys.stdout.write(":")
            else:
                sys.stdout.write(".")
            pointer_at_last_user_update = pointer
            sys.stdout.flush()
        # Check if we're done
        p.poll()
        if not done and p.returncode != None:
            if p.returncode < 0:
                if not output_this_run:
                    print("")
                    output_this_run = True
                write_screen_log("           ==> ERROR: test killed/crashed: " + str(p.returncode) + ".")
            done = True
        # Try reading
        try:
            read_output.seek(pointer)
            char_read = read_output.read(1)
        except IOError:
            time.sleep(1)
            continue
        # If we got a full line then process it
        if char_read == "\n":
            # Look for failures and report them as such
            match = re.search(".*(FAILED|ERROR).*", line)
            if match:
                if not output_this_run:
                    print("")
                    output_this_run = True
                print("           ==> " + line.replace('\n', ''))
            match = re.search(".*FAILED.*", line)
            if match:
                failures_this_run = failures_this_run + 1
            match = re.search(".*(PASSED).*", line)
            if match:
                if not output_this_run:
                    print("")
                    output_this_run = True
                print("               " + line.replace('\n', ''))
            # Write it to the log
            log_file.write("     " + line + "\n")
            log_file.flush()
            line = ""
            pointer = pointer + 1
        # If we are at the end of the file, then re-open it to get new data
        elif char_read == "":
            more_to_read = False
            read_output.close()
            time.sleep(1)
            try:
                os.fsync(output_fd)
                read_output = open(output_name, 'r')
                # See if there is more to read. This happens if the process ends and we have data left.
                read_output.seek(pointer)
                if read_output.read(1) != "":
                    more_to_read = True
            except IOError:
                write_screen_log("\n           ==> ERROR: could not reopen output file from test.")
                return -1
        else:
            line = line + char_read
            pointer = pointer + 1
    # Now we are done, so write out any remaining data in the file:
    # This should only happen if the process exited with an error.
    os.fsync(output_fd)
    while read_output.read(1) != "":
        log_file.write(read_output.read(1))
    # Return the total number of failures
    if (p.returncode == 0 and failures_this_run > 0):
        write_screen_log("\n           ==> ERROR: Test returned 0, but number of FAILED lines reported is " + str(failures_this_run) + ".")
        return failures_this_run
    return p.returncode


def run_tests(tests):
    global curent_directory
    global process_pid
    # Run the tests
    failures = 0
    previous_test = None
    test_number = 1
    for test in tests:
        # Print the name of the test we're running and the time
        (test_name, test_dir) = test
        if test_dir != previous_test:
            print("==========   " + test_dir)
            log_file.write("========================================================================================\n")
            log_file.write("========================================================================================\n")
            log_file.write("(" + get_time() + ")     Running Tests: " + test_dir + "\n")
            log_file.write("========================================================================================\n")
            log_file.write("========================================================================================\n")
            previous_test = test_dir
        print("(" + get_time() + ")     BEGIN  " + test_name.ljust(40) + ": ", end='')
        log_file.write("     ----------------------------------------------------------------------------------------\n")
        log_file.write("     (" + get_time() + ")     Running Sub Test: " + test_name + "\n")
        log_file.write("     ----------------------------------------------------------------------------------------\n")
        log_file.flush()
        sys.stdout.flush()

        # Run the test
        result = 0
        start_time = time.time()
        try:
            process_pid = 0
            result = run_test_checking_output(current_directory, test_dir, log_file)
        except KeyboardInterrupt:
            # Catch an interrupt from the user
            write_screen_log("\nFAILED: Execution interrupted.  Killing test process, but not aborting full test run.")
            os.kill(process_pid, 9)
            if sys.version_info[0] < 3:
                answer = raw_input("Abort all tests? (y/n)")
            else:
                answer = input("Abort all tests? (y/n)")
            if answer.find("y") != -1:
                write_screen_log("\nUser chose to abort all tests.")
                log_file.close()
                sys.exit(-1)
            else:
                write_screen_log("\nUser chose to continue with other tests. Reporting this test as failed.")
                result = 1
        run_time = (time.time() - start_time)

        # Move print the finish status
        if result == 0:
            print("(" + get_time() + ")     PASSED " + test_name.ljust(40) + ": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) + ")", end='')
        else:
            print("(" + get_time() + ")     FAILED " + test_name.ljust(40) + ": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) + ")", end='')

        test_number = test_number + 1
        log_file.write("     ----------------------------------------------------------------------------------------\n")
        log_file.flush()

        print("")
        if result != 0:
            log_file.write("  *******************************************************************************************\n")
            log_file.write("  *  (" + get_time() + ")     Test " + test_name + " ==> FAILED: " + str(result) + "\n")
            log_file.write("  *******************************************************************************************\n")
            failures = failures + 1
        else:
            log_file.write("     (" + get_time() + ")     Test " + test_name + " passed in " + str(run_time) + "s\n")

        log_file.write("     ----------------------------------------------------------------------------------------\n")
        log_file.write("\n")
    return failures


# ########################
# Begin OpenCL conformance run script
# ########################

if len(sys.argv) < 2:
    write_help_info()
    sys.exit(-1)

current_directory = os.getcwd()
# Open the log file
for arg in sys.argv:
    match = re.search("log=(\S+)", arg)
    if match:
        log_file_name = match.group(1).rstrip('/') + os.sep + log_file_name
try:
    log_file = open(log_file_name, "w")
except IOError:
    print("Could not open log file " + log_file_name)
    sys.exit(-1)

# Determine which devices to test
device_types = ["CL_DEVICE_TYPE_DEFAULT", "CL_DEVICE_TYPE_CPU", "CL_DEVICE_TYPE_GPU", "CL_DEVICE_TYPE_ACCELERATOR", "CL_DEVICE_TYPE_ALL"]
devices_to_test = []
for device in device_types:
    if device in sys.argv[2:]:
        devices_to_test.append(device)
if len(devices_to_test) == 0:
    devices_to_test = ["CL_DEVICE_TYPE_DEFAULT"]
write_screen_log("Testing on: " + str(devices_to_test))

# Get the tests
tests = get_tests(sys.argv[1], devices_to_test)

# If tests are specified on the command line then run just those ones
tests_to_use = []
num_of_patterns_to_match = 0
for arg in sys.argv[2:]:
    if arg in device_types:
        continue
    if re.search("log=(\S+)", arg):
        continue
    num_of_patterns_to_match = num_of_patterns_to_match + 1
    found_it = False
    for test in tests:
        (test_name, test_dir) = test
        if (test_name.find(arg) != -1 or test_dir.find(arg) != -1):
            found_it = True
            if test not in tests_to_use:
                tests_to_use.append(test)
    if found_it == False:
        print("Failed to find a test matching " + arg)
if len(tests_to_use) == 0:
    if num_of_patterns_to_match > 0:
        print("FAILED: Failed to find any tests matching the given command-line options.")
        print("")
        write_help_info()
        sys.exit(-1)
else:
    tests = tests_to_use[:]

write_screen_log("Test execution arguments: " + str(sys.argv))
write_screen_log("Logging to file " + log_file_name + ".")
write_screen_log("Loaded tests from " + sys.argv[1] + ", total of " + str(len(tests)) + " tests selected to run:")
for (test_name, test_command) in tests:
    write_screen_log(test_name.ljust(50) + " (" + test_command + ")")

# Run the tests
total_failures = 0
for device_to_test in devices_to_test:
    os.environ['CL_DEVICE_TYPE'] = device_to_test
    write_screen_log("========================================================================================")
    write_screen_log("========================================================================================")
    write_screen_log(("Setting CL_DEVICE_TYPE to " + device_to_test).center(90))
    write_screen_log("========================================================================================")
    write_screen_log("========================================================================================")
    failures = run_tests(tests)
    write_screen_log("========================================================================================")
    if failures == 0:
        write_screen_log(">> TEST on " + device_to_test + " PASSED")
    else:
        write_screen_log(">> TEST on " + device_to_test + " FAILED (" + str(failures) + " FAILURES)")
    write_screen_log("========================================================================================")
    total_failures = total_failures + failures

write_screen_log("(" + get_time() + ") Testing complete.  " + str(total_failures) + " failures for " + str(len(tests)) + " tests.")
log_file.close()