aboutsummaryrefslogtreecommitdiff
path: root/mobly/controllers/sniffer_lib/local/local_base.py
blob: a9fb6738b527757c7f9f93d9315782d6e921c0b1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Class for Local sniffers - i.e. running on the local machine.

This class provides configuration for local interfaces but leaves
the actual capture (sniff) to sub-classes.
"""

import os
import shutil
import signal
import subprocess
import tempfile
from mobly import logger
from mobly import utils
from mobly.controllers import sniffer


class SnifferLocalBase(sniffer.Sniffer):
  """This class defines the common behaviors of WLAN sniffers running on
    WLAN interfaces of the local machine.

    Specific mechanisms to capture packets over the local WLAN interfaces are
    implemented by sub-classes of this class - i.e. it is not a final class.
    """

  def __init__(self, interface, logger, base_configs=None):
    """See base class documentation
        """
    self._base_configs = None
    self._capture_file_path = ""
    self._interface = ""
    self._logger = logger
    self._process = None
    self._temp_capture_file_path = ""

    if interface == "":
      raise sniffer.InvalidDataError("Empty interface provided")
    self._interface = interface
    self._base_configs = base_configs

    try:
      subprocess.check_call(['ifconfig', self._interface, 'down'])
      subprocess.check_call(['iwconfig', self._interface, 'mode', 'monitor'])
      subprocess.check_call(['ifconfig', self._interface, 'up'])
    except Exception as err:
      raise sniffer.ExecutionError(err)

  def get_interface(self):
    """See base class documentation
        """
    return self._interface

  def get_type(self):
    """See base class documentation
        """
    return "local"

  def get_capture_file(self):
    return self._capture_file_path

  def _pre_capture_config(self, override_configs=None):
    """Utility function which configures the wireless interface per the
        specified configurations. Operation is performed before every capture
        start using baseline configurations (specified when sniffer initialized)
        and override configurations specified here.
        """
    final_configs = {}
    if self._base_configs:
      final_configs.update(self._base_configs)
    if override_configs:
      final_configs.update(override_configs)

    if sniffer.Sniffer.CONFIG_KEY_CHANNEL in final_configs:
      try:
        subprocess.check_call([
            'iwconfig', self._interface, 'channel',
            str(final_configs[sniffer.Sniffer.CONFIG_KEY_CHANNEL])
        ])
      except Exception as err:
        raise sniffer.ExecutionError(err)

  def _get_command_line(self,
                        additional_args=None,
                        duration=None,
                        packet_count=None):
    """Utility function to be implemented by every child class - which
        are the concrete sniffer classes. Each sniffer-specific class should
        derive the command line to execute its sniffer based on the specified
        arguments.
        """
    raise NotImplementedError("Base class should not be called directly!")

  def _post_process(self):
    """Utility function which is executed after a capture is done. It
        moves the capture file to the requested location.
        """
    self._process = None
    shutil.move(self._temp_capture_file_path, self._capture_file_path)

  def start_capture(self,
                    override_configs=None,
                    additional_args=None,
                    duration=None,
                    packet_count=None):
    """See base class documentation
        """
    if self._process is not None:
      raise sniffer.InvalidOperationError(
          "Trying to start a sniff while another is still running!")
    capture_dir = os.path.join(self._logger.log_path,
                               "Sniffer-{}".format(self._interface))
    os.makedirs(capture_dir, exist_ok=True)
    self._capture_file_path = os.path.join(
        capture_dir, "capture_{}.pcap".format(logger.get_log_file_timestamp()))

    self._pre_capture_config(override_configs)
    _, self._temp_capture_file_path = tempfile.mkstemp(suffix=".pcap")

    cmd = self._get_command_line(additional_args=additional_args,
                                 duration=duration,
                                 packet_count=packet_count)

    self._process = utils.start_standing_subprocess(cmd)
    return sniffer.ActiveCaptureContext(self, duration)

  def stop_capture(self):
    """See base class documentation
        """
    if self._process is None:
      raise sniffer.InvalidOperationError(
          "Trying to stop a non-started process")
    utils.stop_standing_subprocess(self._process)
    self._post_process()

  def wait_for_capture(self, timeout=None):
    """See base class documentation
        """
    if self._process is None:
      raise sniffer.InvalidOperationError(
          "Trying to wait on a non-started process")
    try:
      utils.wait_for_standing_subprocess(self._process, timeout)
      self._post_process()
    except subprocess.TimeoutExpired:
      self.stop_capture()