diff --git a/fdroidserver/common.py b/fdroidserver/common.py index f4a1fe77..0da602ad 100644 --- a/fdroidserver/common.py +++ b/fdroidserver/common.py @@ -76,7 +76,7 @@ from fdroidserver.exception import FDroidException, VCSException, NoSubmodulesEx BuildException, VerificationException, MetaDataException from .asynchronousfilereader import AsynchronousFileReader -from . import apksigcopier +from . import apksigcopier, common # The path to this fdroidserver distribution @@ -321,6 +321,22 @@ def fill_config_defaults(thisconfig): break +def get_config(options=None): + """ + helper function for getting access to commons.config while safely + initializing if it wasn't initialized yet. + """ + global config + + if config is not None: + return config + + config = {} + common.fill_config_defaults(config) + common.read_config(options) + return config + + def regsub_file(pattern, repl, path): with open(path, 'rb') as f: text = f.read() diff --git a/fdroidserver/scanner.py b/fdroidserver/scanner.py index 8dcb512a..31344288 100644 --- a/fdroidserver/scanner.py +++ b/fdroidserver/scanner.py @@ -20,7 +20,6 @@ import os import re import sys import json -import yaml import imghdr import shutil import logging @@ -42,7 +41,6 @@ from . import metadata from .exception import BuildException, VCSException, ConfigurationException from . import scanner -config = None options = None DEFAULT_JSON_PER_BUILD = {'errors': [], 'warnings': [], 'infos': []} # type: ignore @@ -136,7 +134,7 @@ def _exodus_compile_signatures(signatures): def _datetime_now(): """ - simple warpper for datetime.now to allow mocking it for testing + simple wrapper for datetime.now to allow mocking it for testing """ return datetime.now().astimezone() @@ -145,11 +143,12 @@ def _scanner_cachedir(): """ get `Path` to local cache dir """ - if not common.config: + cfg = common.get_config() + if not cfg: raise ConfigurationException('config not initialized') - if "cachedir_scanner" not in common.config: + if "cachedir_scanner" not in cfg: raise ConfigurationException("could not load 'cachedir_scanner' from config") - cachedir = Path(common.config["cachedir_scanner"]) + cachedir = Path(cfg["cachedir_scanner"]) cachedir.mkdir(exist_ok=True, parents=True) return cachedir @@ -170,7 +169,7 @@ class SignatureDataController: def __init__(self, name, filename): self.name = name self.filename = filename - self.cache_outdated_interval = timedelta(days=7) + self.cache_outdated_interval = None self.data = {} def check_data_version(self): @@ -198,8 +197,27 @@ class SignatureDataController: raise SignatureDataMalformedException() from e except TypeError as e: raise SignatureDataMalformedException() from e - if (timestamp + self.cache_outdated_interval) < scanner._datetime_now(): - raise SignatureDataOutdatedException() + if self.cache_outdated_interval: + if (timestamp + self.cache_outdated_interval) < scanner._datetime_now(): + raise SignatureDataOutdatedException() + + def fetch(self): + try: + self.load_from_cache() + self.verify_data() + self.check_last_updated() + except ( + SignatureDataMalformedException, + SignatureDataVersionMismatchException, + SignatureDataOutdatedException + ): + try: + self.fetch_signatures_from_web() + except AttributeError: + # just load from defaults if fetch_signatures_from_web is not + # implemented + self.load_from_defaults() + self.write_to_cache() def load(self): try: @@ -237,62 +255,70 @@ class SignatureDataController: for k in [x for x in self.data.keys() if x not in valid_keys]: del self.data[k] - # def scan - class ExodusSignatureDataController(SignatureDataController): def __init__(self): super().__init__('Exodus signatures', 'exodus.yml') + self.cache_outdated_interval = timedelta(days=1) # refresh exodus cache after one day - def fetch_signatures_from_web(): - pass - # TODO - # exodus_url = "https://reports.exodus-privacy.eu.org/api/trackers" - # sigs = { - # "signatures": [], - # "timestamp": scanner._datetime_now().isoformat(), - # "version": SCANNER_CACHE_VERSION, - # } + def fetch_signatures_from_web(self): + exodus_url = "https://reports.exodus-privacy.eu.org/api/trackers" + self.data = { + "signatures": {}, + "timestamp": scanner._datetime_now().isoformat(), + "version": SCANNER_CACHE_VERSION, + } - # with urllib.request.urlopen(exodus_url) as f: - # data = json.load(f) - # for tracker in data["trackers"].values(): - # sigs["signatures"].append({ - # "name": tracker["name"], - # "binary_signature": tracker["code_signature"], - # "network_signature": tracker["network_signature"], - # "types": ["tracker", "non-free"] # right now we assume all trackers in exodus are non-free - # }) + with urllib.request.urlopen(exodus_url) as f: + d = json.load(f) + for tracker in d["trackers"].values(): + if tracker.get('code_signature'): + self.data["signatures"][tracker["name"]] = { + "name": tracker["name"], + "warn_code_signatures": [tracker["code_signature"]], + # exodus also provides network signatures, unused atm. + # "network_signatures": [tracker["network_signature"]], + "AntiFeatures": ["Tracking"], + "license": "NonFree" # We assume all trackers in exodus + # are non-free, alought free + # trackers like piwik, acra, + # etc. might be listed by exodus + # too. + } class ScannerSignatureDataController(SignatureDataController): def __init__(self): super().__init__('Scanner signatures', 'scanner.json') - def fetch_signatures_from_web(self): - url = "https://uniqx.gitlab.io/fdroid-scanner-signatures/sigs.json" - with urllib.request.urlopen(url) as f: - data = yaml.safe_load(f) - # TODO: validate parsed data - # TODO: error message 'please update fdroidserver/report' when fetching failed due to changes in the data strucutre - self.data = data - class ScannerTool(): def __init__(self): self.sdcs = [ScannerSignatureDataController()] - for sdc in self.sdcs: - sdc.load() + self.load() self.compile_regexes() + def load(self): + for sdc in self.sdcs: + sdc.load() + def compile_regexes(self): - self.err_regex = {'code_signatures': {}, 'gradle_signatures': {}} + self.regexs = { + 'err_code_signatures': {}, + 'err_gradle_signatures': {}, + 'warn_code_signatures': {}, + 'warn_gradle_signatures': {}, + } for sdc in self.sdcs: for signame, sigdef in sdc.data.get('signatures', {}).items(): for sig in sigdef.get('code_signatures', []): - self.err_regex['code_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE) + self.regexs['err_code_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE) for sig in sigdef.get('gradle_signatures', []): - self.err_regex['gradle_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE) + self.regexs['err_gradle_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE) + for sig in sigdef.get('warn_code_signatures', []): + self.regexs['warn_code_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE) + for sig in sigdef.get('warn_gradle_signatures', []): + self.regexs['warn_gradle_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE) def clear_cache(self): # delete cache folder and all its contents @@ -300,6 +326,10 @@ class ScannerTool(): # re-initialize, this will re-populate the cache from default values self.__init__() + def add(self, new_controller: SignatureDataController): + self.sdcs.append(new_controller) + self.compile_regexes() + # TODO: change this from singleton instance to dependency injection # use `_get_tool()` instead of accessing this directly @@ -342,36 +372,20 @@ def scan_binary(apkfile, extract_signatures=None): """Scan output of dexdump for known non-free classes.""" logging.info(_('Scanning APK with dexdump for known non-free classes.')) result = get_embedded_classes(apkfile) - problems = 0 + problems, warnings = 0, 0 for classname in result: - for suspect, regexp in _get_tool().err_regex['code_signatures'].items(): + for suspect, regexp in _get_tool().regexs['warn_code_signatures'].items(): if regexp.match(classname): - logging.debug("Found class '%s'" % classname) + logging.debug("Warning: found class '%s'" % classname) + warnings += 1 + for suspect, regexp in _get_tool().regexs['err_code_signatures'].items(): + if regexp.match(classname): + logging.debug("Problem: found class '%s'" % classname) problems += 1 - - if extract_signatures: - - def _detect_tracker(sig, tracker, class_list): - for clazz in class_list: - if sig.search(clazz): - logging.debug("Found tracker, class {} matching {}".format(clazz, tracker.code_signature)) - return tracker - return None - - results = [] - args = [(extract_signatures[1][index], tracker, result) - for (index, tracker) in enumerate(extract_signatures[0]) if - len(tracker.code_signature) > 3] - - for res in itertools.starmap(_detect_tracker, args): - if res: - results.append(res) - - trackers = [t for t in results if t is not None] - problems += len(trackers) - + if warnings: + logging.warning(_("Found {count} warnings in {filename}").format(count=warnings, filename=apkfile)) if problems: - logging.critical("Found problems in %s" % apkfile) + logging.critical(_("Found {count} problems in {filename}").format(count=problems, filename=apkfile)) return problems @@ -396,7 +410,7 @@ def scan_source(build_dir, build=metadata.Build()): return any(al in s for al in allowlisted) def suspects_found(s): - for n, r in _get_tool().err_regex['gradle_signatures'].items(): + for n, r in _get_tool().regexs['err_gradle_signatures'].items(): if r.match(s) and not is_allowlisted(s): yield n @@ -669,7 +683,7 @@ def scan_source(build_dir, build=metadata.Build()): def main(): - global config, options, json_per_build + global options, json_per_build # Parse command line... parser = ArgumentParser( @@ -699,24 +713,25 @@ def main(): else: logging.getLogger().setLevel(logging.ERROR) - config = common.read_config(options) + # initialize/load configuration values + common.get_config(options) if options.clear_cache: scanner._get_tool().clear_cache() + if options.exodus: + c = ExodusSignatureDataController() + c.fetch() + scanner._get_tool().add(c) probcount = 0 - exodus = [] - if options.exodus: - exodus = load_exodus_trackers_signatures() - appids = [] for apk in options.appid: if os.path.isfile(apk): - count = scanner.scan_binary(apk, exodus) + count = scanner.scan_binary(apk) if count > 0: logging.warning( - _('Scanner found {count} problems in {apk}:').format( + _('Scanner found {count} problems in {apk}').format( count=count, apk=apk ) ) diff --git a/tests/build.TestCase b/tests/build.TestCase index 13378060..cca5fa0c 100755 --- a/tests/build.TestCase +++ b/tests/build.TestCase @@ -402,7 +402,7 @@ class BuildTest(unittest.TestCase): os.chdir(testdir) os.mkdir("build") - config = dict() + config = fdroidserver.common.get_config() config['sdk_path'] = os.getenv('ANDROID_HOME') config['ndk_paths'] = {'r10d': os.getenv('ANDROID_NDK_HOME')} fdroidserver.common.config = config diff --git a/tests/scanner.TestCase b/tests/scanner.TestCase index bb595832..ade8bd0a 100755 --- a/tests/scanner.TestCase +++ b/tests/scanner.TestCase @@ -31,7 +31,7 @@ import fdroidserver.build import fdroidserver.common import fdroidserver.metadata import fdroidserver.scanner -from testcommon import TmpCwd +from testcommon import TmpCwd, mock_open_to_str class ScannerTest(unittest.TestCase): @@ -449,17 +449,18 @@ class Test_scan_binary(unittest.TestCase): fdroidserver.common.options = mock.Mock() fdroidserver.scanner._SCANNER_TOOL = mock.Mock() - fdroidserver.scanner._SCANNER_TOOL.err_regex = {} - fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'] = { + fdroidserver.scanner._SCANNER_TOOL.regexs = {} + fdroidserver.scanner._SCANNER_TOOL.regexs['err_code_signatures'] = { "java/lang/Object": re.compile(r'.*java/lang/Object', re.IGNORECASE | re.UNICODE) } + fdroidserver.scanner._SCANNER_TOOL.regexs['warn_code_signatures'] = {} def test_code_signature_match(self): apkfile = os.path.join(self.basedir, 'no_targetsdk_minsdk1_unsigned.apk') self.assertEqual( 1, fdroidserver.scanner.scan_binary(apkfile), - "Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'].values(), apkfile), + "Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner._SCANNER_TOOL.regexs['err_code_signatures'].values(), apkfile), ) @unittest.skipIf( @@ -470,7 +471,7 @@ class Test_scan_binary(unittest.TestCase): ) def test_bottom_level_embedded_apk_code_signature(self): apkfile = os.path.join(self.basedir, 'apk.embedded_1.apk') - fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'] = { + fdroidserver.scanner._SCANNER_TOOL.regexs['err_code_signatures'] = { "org/bitbucket/tickytacky/mirrormirror/MainActivity": re.compile( r'.*org/bitbucket/tickytacky/mirrormirror/MainActivity', re.IGNORECASE | re.UNICODE ) @@ -479,12 +480,12 @@ class Test_scan_binary(unittest.TestCase): self.assertEqual( 1, fdroidserver.scanner.scan_binary(apkfile), - "Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'].values(), apkfile), + "Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner._SCANNER_TOOL.regexs['err_code_signatures'].values(), apkfile), ) def test_top_level_signature_embedded_apk_present(self): apkfile = os.path.join(self.basedir, 'apk.embedded_1.apk') - fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'] = { + fdroidserver.scanner._SCANNER_TOOL.regexs['err_code_signatures'] = { "org/fdroid/ci/BuildConfig": re.compile( r'.*org/fdroid/ci/BuildConfig', re.IGNORECASE | re.UNICODE ) @@ -492,7 +493,7 @@ class Test_scan_binary(unittest.TestCase): self.assertEqual( 1, fdroidserver.scanner.scan_binary(apkfile), - "Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'].values(), apkfile), + "Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner._SCANNER_TOOL.regexs['err_code_signatures'].values(), apkfile), ) # TODO: re-enable once allow-listing migrated to more complex regexes @@ -671,6 +672,23 @@ class Test_SignatureDataController(unittest.TestCase): with self.assertRaises(fdroidserver.scanner.SignatureDataVersionMismatchException): sdc.check_data_version() + def test_write_to_cache(self): + open_func = mock.mock_open() + sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml') + sdc.data = {"mocked": "data"} + + with mock.patch("builtins.open", open_func), mock.patch( + "fdroidserver.scanner._scanner_cachedir", + return_value=pathlib.Path('.'), + ): + sdc.write_to_cache() + + open_func.assert_called_with(pathlib.Path('fff.yml'), 'w', encoding="utf-8") + self.assertEqual( + mock_open_to_str(open_func), + """{\n "mocked": "data"\n}""" + ) + class Test_ScannerSignatureDataController_fetch_signatures_from_web(unittest.TestCase): def setUp(self): @@ -693,34 +711,6 @@ class Test_ScannerSignatureDataController_fetch_signatures_from_web(unittest.Tes - ads '''))) - def test_fetch_signatures_from_web(self): - sdc = fdroidserver.scanner.ScannerSignatureDataController() - with unittest.mock.patch('urllib.request.urlopen', self.uo_func): - sdc.fetch_signatures_from_web() - self.assertEqual(sdc.data.get('version'), 999) - self.assertEqual(sdc.data.get('timestamp'), "1999-12-31T23:59:59.999999+00:00") - self.assertListEqual( - sdc.data.get('signatures'), - [ - { - 'binary_signature': 'com/google/firebase', - 'name': 'Google Firebase', - 'types': ['tracker', 'non-free'], - }, - { - 'gradle_signature': 'com/google/android/gms', - 'name': 'Google Mobile Services', - 'types': ['non-free'], - }, - { - 'network_signature': 'doubleclick\\.net', - 'name': 'Another thing to test.', - 'types': ['ads'], - }, - ] - ) - self.assertEqual(len(sdc.data), 3) - class Test_main(unittest.TestCase): def setUp(self): @@ -768,7 +758,7 @@ class Test_main(unittest.TestCase): self.exit_func.assert_not_called() self.read_app_args_func.assert_not_called() - self.scan_binary_func.assert_called_once_with('local.application.apk', []) + self.scan_binary_func.assert_called_once_with('local.application.apk') if __name__ == "__main__": diff --git a/tests/testcommon.py b/tests/testcommon.py index 128ead00..29b3f9b6 100644 --- a/tests/testcommon.py +++ b/tests/testcommon.py @@ -50,6 +50,11 @@ class TmpPyPath(): def mock_open_to_str(mock): + """ + helper function for accessing all data written into a + unittest.mock.mock_open() instance as a string. + """ + return "".join([ x.args[0] for x in mock.mock_calls if str(x).startswith("call().write(") ])