diff options
Diffstat (limited to 'devlib/utils/android.py')
-rw-r--r-- | devlib/utils/android.py | 207 |
1 files changed, 185 insertions, 22 deletions
diff --git a/devlib/utils/android.py b/devlib/utils/android.py index d2a1629..bdf2854 100644 --- a/devlib/utils/android.py +++ b/devlib/utils/android.py @@ -20,15 +20,20 @@ Utility functions for working with Android devices through adb. """ # pylint: disable=E1103 import os +import pexpect import time import subprocess import logging import re +import threading +import tempfile +import Queue from collections import defaultdict from devlib.exception import TargetError, HostError, DevlibError -from devlib.utils.misc import check_output, which, memoized +from devlib.utils.misc import check_output, which, memoized, ABI_MAP from devlib.utils.misc import escape_single_quotes, escape_double_quotes +from devlib import host logger = logging.getLogger('android') @@ -124,13 +129,18 @@ class ApkInfo(object): self.label = None self.version_name = None self.version_code = None + self.native_code = None self.parse(path) def parse(self, apk_path): _check_env() command = [aapt, 'dump', 'badging', apk_path] logger.debug(' '.join(command)) - output = subprocess.check_output(command) + try: + output = subprocess.check_output(command, stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + raise HostError('Error parsing APK file {}. `aapt` says:\n{}' + .format(apk_path, e.output)) for line in output.split('\n'): if line.startswith('application-label:'): self.label = line.split(':')[1].strip().replace('\'', '') @@ -143,6 +153,19 @@ class ApkInfo(object): elif line.startswith('launchable-activity:'): match = self.name_regex.search(line) self.activity = match.group('name') + elif line.startswith('native-code'): + apk_abis = [entry.strip() for entry in line.split(':')[1].split("'") if entry.strip()] + mapped_abis = [] + for apk_abi in apk_abis: + found = False + for abi, architectures in ABI_MAP.iteritems(): + if apk_abi in architectures: + mapped_abis.append(abi) + found = True + break + if not found: + mapped_abis.append(apk_abi) + self.native_code = mapped_abis else: pass # not interested @@ -163,7 +186,7 @@ class AdbConnection(object): @memoized def newline_separator(self): output = adb_command(self.device, - "shell '({}); echo \"\n$?\"'".format(self.ls_command)) + "shell '({}); echo \"\n$?\"'".format(self.ls_command), adb_server=self.adb_server) if output.endswith('\r\n'): return '\r\n' elif output.endswith('\n'): @@ -178,7 +201,7 @@ class AdbConnection(object): def _setup_ls(self): command = "shell '(ls -1); echo \"\n$?\"'" try: - output = adb_command(self.device, command, timeout=self.timeout) + output = adb_command(self.device, command, timeout=self.timeout, adb_server=self.adb_server) except subprocess.CalledProcessError as e: raise HostError( 'Failed to set up ls command on Android device. Output:\n' @@ -191,11 +214,12 @@ class AdbConnection(object): self.ls_command = 'ls' logger.debug("ls command is set to {}".format(self.ls_command)) - def __init__(self, device=None, timeout=None, platform=None): + def __init__(self, device=None, timeout=None, platform=None, adb_server=None): self.timeout = timeout if timeout is not None else self.default_timeout if device is None: - device = adb_get_device(timeout=timeout) + device = adb_get_device(timeout=timeout, adb_server=adb_server) self.device = device + self.adb_server = adb_server adb_connect(self.device) AdbConnection.active_connections[self.device] += 1 self._setup_ls() @@ -206,7 +230,7 @@ class AdbConnection(object): command = "push '{}' '{}'".format(source, dest) if not os.path.exists(source): raise HostError('No such file "{}"'.format(source)) - return adb_command(self.device, command, timeout=timeout) + return adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server) def pull(self, source, dest, timeout=None): if timeout is None: @@ -215,18 +239,18 @@ class AdbConnection(object): if os.path.isdir(dest) and \ ('*' in source or '?' in source): command = 'shell {} {}'.format(self.ls_command, source) - output = adb_command(self.device, command, timeout=timeout) + output = adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server) for line in output.splitlines(): command = "pull '{}' '{}'".format(line.strip(), dest) - adb_command(self.device, command, timeout=timeout) + adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server) return command = "pull '{}' '{}'".format(source, dest) - return adb_command(self.device, command, timeout=timeout) + return adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server) def execute(self, command, timeout=None, check_exit_code=False, as_root=False, strip_colors=True): return adb_shell(self.device, command, timeout, check_exit_code, - as_root, self.newline_separator) + as_root, self.newline_separator,adb_server=self.adb_server) def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False): return adb_background_shell(self.device, command, stdout, stderr, as_root) @@ -258,7 +282,7 @@ def fastboot_flash_partition(partition, path_to_image): fastboot_command(command) -def adb_get_device(timeout=None): +def adb_get_device(timeout=None, adb_server=None): """ Returns the serial number of a connected android device. @@ -267,13 +291,17 @@ def adb_get_device(timeout=None): """ # TODO this is a hacky way to issue a adb command to all listed devices + # Ensure server is started so the 'daemon started successfully' message + # doesn't confuse the parsing below + adb_command(None, 'start-server', adb_server=adb_server) + # The output of calling adb devices consists of a heading line then # a list of the devices sperated by new line # The last line is a blank new line. in otherwords, if there is a device found # then the output length is 2 + (1 for each device) start = time.time() while True: - output = adb_command(None, "devices").splitlines() # pylint: disable=E1103 + output = adb_command(None, "devices", adb_server=adb_server).splitlines() # pylint: disable=E1103 output_length = len(output) if output_length == 3: # output[1] is the 2nd line in the output which has the device name @@ -339,11 +367,14 @@ def _ping(device): def adb_shell(device, command, timeout=None, check_exit_code=False, - as_root=False, newline_separator='\r\n'): # NOQA + as_root=False, newline_separator='\r\n', adb_server=None): # NOQA _check_env() if as_root: command = 'echo \'{}\' | su'.format(escape_single_quotes(command)) - device_part = ['-s', device] if device else [] + device_part = [] + if adb_server: + device_part = ['-H', adb_server] + device_part += ['-s', device] if device else [] # On older combinations of ADB/Android versions, the adb host command always # exits with 0 if it was able to run the command on the target, even if the @@ -381,8 +412,9 @@ def adb_shell(device, command, timeout=None, check_exit_code=False, raise TargetError(message.format(re_search[0])) else: message = 'adb has returned early; did not get an exit code. '\ - 'Was kill-server invoked?' - raise TargetError(message) + 'Was kill-server invoked?\nOUTPUT:\n-----\n{}\n'\ + '-----\nERROR:\n-----\n{}\n-----' + raise TargetError(message.format(raw_output, error)) return output @@ -401,8 +433,8 @@ def adb_background_shell(device, command, return subprocess.Popen(full_command, stdout=stdout, stderr=stderr, shell=True) -def adb_list_devices(): - output = adb_command(None, 'devices') +def adb_list_devices(adb_server=None): + output = adb_command(None, 'devices',adb_server=adb_server) devices = [] for line in output.splitlines(): parts = [p.strip() for p in line.split()] @@ -411,14 +443,39 @@ def adb_list_devices(): return devices -def adb_command(device, command, timeout=None): +def get_adb_command(device, command, timeout=None,adb_server=None): _check_env() - device_string = ' -s {}'.format(device) if device else '' - full_command = "adb{} {}".format(device_string, command) + device_string = "" + if adb_server != None: + device_string = ' -H {}'.format(adb_server) + device_string += ' -s {}'.format(device) if device else '' + return "adb{} {}".format(device_string, command) + +def adb_command(device, command, timeout=None,adb_server=None): + full_command = get_adb_command(device, command, timeout, adb_server) logger.debug(full_command) output, _ = check_output(full_command, timeout, shell=True) return output +def grant_app_permissions(target, package): + """ + Grant an app all the permissions it may ask for + """ + dumpsys = target.execute('dumpsys package {}'.format(package)) + + permissions = re.search( + 'requested permissions:\s*(?P<permissions>(android.permission.+\s*)+)', dumpsys + ) + if permissions is None: + return + permissions = permissions.group('permissions').replace(" ", "").splitlines() + + for permission in permissions: + try: + target.execute('pm grant {} {}'.format(package, permission)) + except TargetError: + logger.debug('Cannot grant {}'.format(permission)) + # Messy environment initialisation stuff... @@ -494,3 +551,109 @@ def _check_env(): platform_tools = _env.platform_tools adb = _env.adb aapt = _env.aapt + +class LogcatMonitor(object): + """ + Helper class for monitoring Anroid's logcat + + :param target: Android target to monitor + :type target: :class:`AndroidTarget` + + :param regexps: List of uncompiled regular expressions to filter on the + device. Logcat entries that don't match any will not be + seen. If omitted, all entries will be sent to host. + :type regexps: list(str) + """ + + @property + def logfile(self): + return self._logfile + + def __init__(self, target, regexps=None): + super(LogcatMonitor, self).__init__() + + self.target = target + self._regexps = regexps + + def start(self, outfile=None): + """ + Start logcat and begin monitoring + + :param outfile: Optional path to file to store all logcat entries + :type outfile: str + """ + if outfile: + self._logfile = open(outfile, 'w') + else: + self._logfile = tempfile.NamedTemporaryFile() + + self.target.clear_logcat() + + logcat_cmd = 'logcat' + + # Join all requested regexps with an 'or' + if self._regexps: + regexp = '{}'.format('|'.join(self._regexps)) + if len(self._regexps) > 1: + regexp = '({})'.format(regexp) + logcat_cmd = '{} -e "{}"'.format(logcat_cmd, regexp) + + logcat_cmd = get_adb_command(self.target.conn.device, logcat_cmd) + + logger.debug('logcat command ="{}"'.format(logcat_cmd)) + self._logcat = pexpect.spawn(logcat_cmd, logfile=self._logfile) + + def stop(self): + self._logcat.terminate() + self._logfile.close() + + def get_log(self): + """ + Return the list of lines found by the monitor + """ + with open(self._logfile.name) as fh: + return [line for line in fh] + + def clear_log(self): + with open(self._logfile.name, 'w') as fh: + pass + + def search(self, regexp): + """ + Search a line that matches a regexp in the logcat log + Return immediatly + """ + return [line for line in self.get_log() if re.match(regexp, line)] + + def wait_for(self, regexp, timeout=30): + """ + Search a line that matches a regexp in the logcat log + Wait for it to appear if it's not found + + :param regexp: regexp to search + :type regexp: str + + :param timeout: Timeout in seconds, before rasing RuntimeError. + ``None`` means wait indefinitely + :type timeout: number + + :returns: List of matched strings + """ + log = self.get_log() + res = [line for line in log if re.match(regexp, line)] + + # Found some matches, return them + if len(res) > 0: + return res + + # Store the number of lines we've searched already, so we don't have to + # re-grep them after 'expect' returns + next_line_num = len(log) + + try: + self._logcat.expect(regexp, timeout=timeout) + except pexpect.TIMEOUT: + raise RuntimeError('Logcat monitor timeout ({}s)'.format(timeout)) + + return [line for line in self.get_log()[next_line_num:] + if re.match(regexp, line)] |