aboutsummaryrefslogtreecommitdiff
path: root/devlib/utils/android.py
diff options
context:
space:
mode:
Diffstat (limited to 'devlib/utils/android.py')
-rw-r--r--devlib/utils/android.py207
1 files changed, 185 insertions, 22 deletions
diff --git a/devlib/utils/android.py b/devlib/utils/android.py
index d2a1629..bdf2854 100644
--- a/devlib/utils/android.py
+++ b/devlib/utils/android.py
@@ -20,15 +20,20 @@ Utility functions for working with Android devices through adb.
"""
# pylint: disable=E1103
import os
+import pexpect
import time
import subprocess
import logging
import re
+import threading
+import tempfile
+import Queue
from collections import defaultdict
from devlib.exception import TargetError, HostError, DevlibError
-from devlib.utils.misc import check_output, which, memoized
+from devlib.utils.misc import check_output, which, memoized, ABI_MAP
from devlib.utils.misc import escape_single_quotes, escape_double_quotes
+from devlib import host
logger = logging.getLogger('android')
@@ -124,13 +129,18 @@ class ApkInfo(object):
self.label = None
self.version_name = None
self.version_code = None
+ self.native_code = None
self.parse(path)
def parse(self, apk_path):
_check_env()
command = [aapt, 'dump', 'badging', apk_path]
logger.debug(' '.join(command))
- output = subprocess.check_output(command)
+ try:
+ output = subprocess.check_output(command, stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as e:
+ raise HostError('Error parsing APK file {}. `aapt` says:\n{}'
+ .format(apk_path, e.output))
for line in output.split('\n'):
if line.startswith('application-label:'):
self.label = line.split(':')[1].strip().replace('\'', '')
@@ -143,6 +153,19 @@ class ApkInfo(object):
elif line.startswith('launchable-activity:'):
match = self.name_regex.search(line)
self.activity = match.group('name')
+ elif line.startswith('native-code'):
+ apk_abis = [entry.strip() for entry in line.split(':')[1].split("'") if entry.strip()]
+ mapped_abis = []
+ for apk_abi in apk_abis:
+ found = False
+ for abi, architectures in ABI_MAP.iteritems():
+ if apk_abi in architectures:
+ mapped_abis.append(abi)
+ found = True
+ break
+ if not found:
+ mapped_abis.append(apk_abi)
+ self.native_code = mapped_abis
else:
pass # not interested
@@ -163,7 +186,7 @@ class AdbConnection(object):
@memoized
def newline_separator(self):
output = adb_command(self.device,
- "shell '({}); echo \"\n$?\"'".format(self.ls_command))
+ "shell '({}); echo \"\n$?\"'".format(self.ls_command), adb_server=self.adb_server)
if output.endswith('\r\n'):
return '\r\n'
elif output.endswith('\n'):
@@ -178,7 +201,7 @@ class AdbConnection(object):
def _setup_ls(self):
command = "shell '(ls -1); echo \"\n$?\"'"
try:
- output = adb_command(self.device, command, timeout=self.timeout)
+ output = adb_command(self.device, command, timeout=self.timeout, adb_server=self.adb_server)
except subprocess.CalledProcessError as e:
raise HostError(
'Failed to set up ls command on Android device. Output:\n'
@@ -191,11 +214,12 @@ class AdbConnection(object):
self.ls_command = 'ls'
logger.debug("ls command is set to {}".format(self.ls_command))
- def __init__(self, device=None, timeout=None, platform=None):
+ def __init__(self, device=None, timeout=None, platform=None, adb_server=None):
self.timeout = timeout if timeout is not None else self.default_timeout
if device is None:
- device = adb_get_device(timeout=timeout)
+ device = adb_get_device(timeout=timeout, adb_server=adb_server)
self.device = device
+ self.adb_server = adb_server
adb_connect(self.device)
AdbConnection.active_connections[self.device] += 1
self._setup_ls()
@@ -206,7 +230,7 @@ class AdbConnection(object):
command = "push '{}' '{}'".format(source, dest)
if not os.path.exists(source):
raise HostError('No such file "{}"'.format(source))
- return adb_command(self.device, command, timeout=timeout)
+ return adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server)
def pull(self, source, dest, timeout=None):
if timeout is None:
@@ -215,18 +239,18 @@ class AdbConnection(object):
if os.path.isdir(dest) and \
('*' in source or '?' in source):
command = 'shell {} {}'.format(self.ls_command, source)
- output = adb_command(self.device, command, timeout=timeout)
+ output = adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server)
for line in output.splitlines():
command = "pull '{}' '{}'".format(line.strip(), dest)
- adb_command(self.device, command, timeout=timeout)
+ adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server)
return
command = "pull '{}' '{}'".format(source, dest)
- return adb_command(self.device, command, timeout=timeout)
+ return adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server)
def execute(self, command, timeout=None, check_exit_code=False,
as_root=False, strip_colors=True):
return adb_shell(self.device, command, timeout, check_exit_code,
- as_root, self.newline_separator)
+ as_root, self.newline_separator,adb_server=self.adb_server)
def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False):
return adb_background_shell(self.device, command, stdout, stderr, as_root)
@@ -258,7 +282,7 @@ def fastboot_flash_partition(partition, path_to_image):
fastboot_command(command)
-def adb_get_device(timeout=None):
+def adb_get_device(timeout=None, adb_server=None):
"""
Returns the serial number of a connected android device.
@@ -267,13 +291,17 @@ def adb_get_device(timeout=None):
"""
# TODO this is a hacky way to issue a adb command to all listed devices
+ # Ensure server is started so the 'daemon started successfully' message
+ # doesn't confuse the parsing below
+ adb_command(None, 'start-server', adb_server=adb_server)
+
# The output of calling adb devices consists of a heading line then
# a list of the devices sperated by new line
# The last line is a blank new line. in otherwords, if there is a device found
# then the output length is 2 + (1 for each device)
start = time.time()
while True:
- output = adb_command(None, "devices").splitlines() # pylint: disable=E1103
+ output = adb_command(None, "devices", adb_server=adb_server).splitlines() # pylint: disable=E1103
output_length = len(output)
if output_length == 3:
# output[1] is the 2nd line in the output which has the device name
@@ -339,11 +367,14 @@ def _ping(device):
def adb_shell(device, command, timeout=None, check_exit_code=False,
- as_root=False, newline_separator='\r\n'): # NOQA
+ as_root=False, newline_separator='\r\n', adb_server=None): # NOQA
_check_env()
if as_root:
command = 'echo \'{}\' | su'.format(escape_single_quotes(command))
- device_part = ['-s', device] if device else []
+ device_part = []
+ if adb_server:
+ device_part = ['-H', adb_server]
+ device_part += ['-s', device] if device else []
# On older combinations of ADB/Android versions, the adb host command always
# exits with 0 if it was able to run the command on the target, even if the
@@ -381,8 +412,9 @@ def adb_shell(device, command, timeout=None, check_exit_code=False,
raise TargetError(message.format(re_search[0]))
else:
message = 'adb has returned early; did not get an exit code. '\
- 'Was kill-server invoked?'
- raise TargetError(message)
+ 'Was kill-server invoked?\nOUTPUT:\n-----\n{}\n'\
+ '-----\nERROR:\n-----\n{}\n-----'
+ raise TargetError(message.format(raw_output, error))
return output
@@ -401,8 +433,8 @@ def adb_background_shell(device, command,
return subprocess.Popen(full_command, stdout=stdout, stderr=stderr, shell=True)
-def adb_list_devices():
- output = adb_command(None, 'devices')
+def adb_list_devices(adb_server=None):
+ output = adb_command(None, 'devices',adb_server=adb_server)
devices = []
for line in output.splitlines():
parts = [p.strip() for p in line.split()]
@@ -411,14 +443,39 @@ def adb_list_devices():
return devices
-def adb_command(device, command, timeout=None):
+def get_adb_command(device, command, timeout=None,adb_server=None):
_check_env()
- device_string = ' -s {}'.format(device) if device else ''
- full_command = "adb{} {}".format(device_string, command)
+ device_string = ""
+ if adb_server != None:
+ device_string = ' -H {}'.format(adb_server)
+ device_string += ' -s {}'.format(device) if device else ''
+ return "adb{} {}".format(device_string, command)
+
+def adb_command(device, command, timeout=None,adb_server=None):
+ full_command = get_adb_command(device, command, timeout, adb_server)
logger.debug(full_command)
output, _ = check_output(full_command, timeout, shell=True)
return output
+def grant_app_permissions(target, package):
+ """
+ Grant an app all the permissions it may ask for
+ """
+ dumpsys = target.execute('dumpsys package {}'.format(package))
+
+ permissions = re.search(
+ 'requested permissions:\s*(?P<permissions>(android.permission.+\s*)+)', dumpsys
+ )
+ if permissions is None:
+ return
+ permissions = permissions.group('permissions').replace(" ", "").splitlines()
+
+ for permission in permissions:
+ try:
+ target.execute('pm grant {} {}'.format(package, permission))
+ except TargetError:
+ logger.debug('Cannot grant {}'.format(permission))
+
# Messy environment initialisation stuff...
@@ -494,3 +551,109 @@ def _check_env():
platform_tools = _env.platform_tools
adb = _env.adb
aapt = _env.aapt
+
+class LogcatMonitor(object):
+ """
+ Helper class for monitoring Anroid's logcat
+
+ :param target: Android target to monitor
+ :type target: :class:`AndroidTarget`
+
+ :param regexps: List of uncompiled regular expressions to filter on the
+ device. Logcat entries that don't match any will not be
+ seen. If omitted, all entries will be sent to host.
+ :type regexps: list(str)
+ """
+
+ @property
+ def logfile(self):
+ return self._logfile
+
+ def __init__(self, target, regexps=None):
+ super(LogcatMonitor, self).__init__()
+
+ self.target = target
+ self._regexps = regexps
+
+ def start(self, outfile=None):
+ """
+ Start logcat and begin monitoring
+
+ :param outfile: Optional path to file to store all logcat entries
+ :type outfile: str
+ """
+ if outfile:
+ self._logfile = open(outfile, 'w')
+ else:
+ self._logfile = tempfile.NamedTemporaryFile()
+
+ self.target.clear_logcat()
+
+ logcat_cmd = 'logcat'
+
+ # Join all requested regexps with an 'or'
+ if self._regexps:
+ regexp = '{}'.format('|'.join(self._regexps))
+ if len(self._regexps) > 1:
+ regexp = '({})'.format(regexp)
+ logcat_cmd = '{} -e "{}"'.format(logcat_cmd, regexp)
+
+ logcat_cmd = get_adb_command(self.target.conn.device, logcat_cmd)
+
+ logger.debug('logcat command ="{}"'.format(logcat_cmd))
+ self._logcat = pexpect.spawn(logcat_cmd, logfile=self._logfile)
+
+ def stop(self):
+ self._logcat.terminate()
+ self._logfile.close()
+
+ def get_log(self):
+ """
+ Return the list of lines found by the monitor
+ """
+ with open(self._logfile.name) as fh:
+ return [line for line in fh]
+
+ def clear_log(self):
+ with open(self._logfile.name, 'w') as fh:
+ pass
+
+ def search(self, regexp):
+ """
+ Search a line that matches a regexp in the logcat log
+ Return immediatly
+ """
+ return [line for line in self.get_log() if re.match(regexp, line)]
+
+ def wait_for(self, regexp, timeout=30):
+ """
+ Search a line that matches a regexp in the logcat log
+ Wait for it to appear if it's not found
+
+ :param regexp: regexp to search
+ :type regexp: str
+
+ :param timeout: Timeout in seconds, before rasing RuntimeError.
+ ``None`` means wait indefinitely
+ :type timeout: number
+
+ :returns: List of matched strings
+ """
+ log = self.get_log()
+ res = [line for line in log if re.match(regexp, line)]
+
+ # Found some matches, return them
+ if len(res) > 0:
+ return res
+
+ # Store the number of lines we've searched already, so we don't have to
+ # re-grep them after 'expect' returns
+ next_line_num = len(log)
+
+ try:
+ self._logcat.expect(regexp, timeout=timeout)
+ except pexpect.TIMEOUT:
+ raise RuntimeError('Logcat monitor timeout ({}s)'.format(timeout))
+
+ return [line for line in self.get_log()[next_line_num:]
+ if re.match(regexp, line)]