add --clear-cache option to scanner

This commit is contained in:
Michael Pöhn 2022-09-23 18:33:02 +02:00
parent e4b54fe4a7
commit d5ef1b2e95
2 changed files with 83 additions and 44 deletions

View file

@ -16,26 +16,26 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import imghdr
import json
import os import os
import re import re
import sys import sys
import traceback import json
import zipfile
import yaml import yaml
import imghdr
import shutil
import logging
import zipfile
import requests
import itertools
import traceback
import urllib.request
from argparse import ArgumentParser from argparse import ArgumentParser
from collections import namedtuple from collections import namedtuple
from copy import deepcopy from copy import deepcopy
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from pathlib import Path from pathlib import Path
import logging
import itertools
import urllib.request
from datetime import datetime, timedelta from datetime import datetime, timedelta
import requests
from . import _ from . import _
from . import common from . import common
from . import metadata from . import metadata
@ -145,18 +145,24 @@ def _scanner_cachedir():
""" """
get `Path` to local cache dir get `Path` to local cache dir
""" """
if not common.config or "cachedir_scanner" not in common.config: if not common.config:
raise ConfigurationException("could not load 'cachedir_scanner' config") raise ConfigurationException('config not initialized')
cachedir = Path(config["cachedir_scanner"]) if "cachedir_scanner" not in common.config:
raise ConfigurationException("could not load 'cachedir_scanner' from config")
cachedir = Path(common.config["cachedir_scanner"])
cachedir.mkdir(exist_ok=True, parents=True) cachedir.mkdir(exist_ok=True, parents=True)
return cachedir return cachedir
class SignatureCacheMalformedException(Exception): class SignatureDataMalformedException(Exception):
pass pass
class SignatureCacheOutdatedException(Exception): class SignatureDataOutdatedException(Exception):
pass
class SignatureDataVersionMismatchException(Exception):
pass pass
@ -169,26 +175,37 @@ class SignatureDataController:
def check_data_version(self): def check_data_version(self):
if self.data.get("version") != SCANNER_CACHE_VERSION: if self.data.get("version") != SCANNER_CACHE_VERSION:
raise SignatureCacheMalformedException() raise SignatureDataVersionMismatchException()
def check_last_updated(self): def check_last_updated(self):
'''
NOTE: currently not in use
Checks if the timestamp value is ok. Raises an exception if something
is not ok.
:raises SignatureDataMalformedException: when timestamp value is
inaccessible or not parseable
:raises SignatureDataOutdatedException: when timestamp is older then
`self.cache_outdated_interval`
'''
timestamp = self.data.get("timestamp") timestamp = self.data.get("timestamp")
if not timestamp: if not timestamp:
raise SignatureCacheMalformedException() raise SignatureDataMalformedException()
try: try:
timestamp = datetime.fromisoformat(timestamp) timestamp = datetime.fromisoformat(timestamp)
except ValueError as e: except ValueError as e:
raise SignatureCacheMalformedException() from e raise SignatureDataMalformedException() from e
except TypeError as e: except TypeError as e:
raise SignatureCacheMalformedException() from e raise SignatureDataMalformedException() from e
if (timestamp + self.cache_outdated_interval) < scanner._datetime_now(): if (timestamp + self.cache_outdated_interval) < scanner._datetime_now():
raise SignatureCacheOutdatedException() raise SignatureDataOutdatedException()
def load(self): def load(self):
try: try:
self.load_from_cache() self.load_from_cache()
self.verify_data() self.verify_data()
except SignatureCacheMalformedException as e: except (SignatureDataMalformedException, SignatureDataVersionMismatchException):
self.load_from_defaults() self.load_from_defaults()
self.write_to_cache() self.write_to_cache()
@ -200,7 +217,7 @@ class SignatureDataController:
def load_from_cache(self): def load_from_cache(self):
sig_file = scanner._scanner_cachedir() / self.filename sig_file = scanner._scanner_cachedir() / self.filename
if not sig_file.exists(): if not sig_file.exists():
raise SignatureCacheMalformedException() raise SignatureDataMalformedException()
with open(sig_file) as f: with open(sig_file) as f:
self.data = json.load(f) self.data = json.load(f)
@ -211,7 +228,12 @@ class SignatureDataController:
logging.debug("write '{}' to cache".format(self.filename)) logging.debug("write '{}' to cache".format(self.filename))
def verify_data(self): def verify_data(self):
'''
cleans and validates and cleans `self.data`
'''
self.check_data_version()
valid_keys = ['timestamp', 'version', 'signatures'] valid_keys = ['timestamp', 'version', 'signatures']
for k in [x for x in self.data.keys() if x not in valid_keys]: for k in [x for x in self.data.keys() if x not in valid_keys]:
del self.data[k] del self.data[k]
@ -264,23 +286,35 @@ class ScannerTool():
self.compile_regexes() self.compile_regexes()
def compile_regexes(self): def compile_regexes(self):
self.regex = {'code_signatures': {}, 'gradle_signatures': {}} self.err_regex = {'code_signatures': {}, 'gradle_signatures': {}}
for sdc in self.sdcs: for sdc in self.sdcs:
for signame, sigdef in sdc.data.get('signatures', {}).items(): for signame, sigdef in sdc.data.get('signatures', {}).items():
for sig in sigdef.get('code_signatures', []): for sig in sigdef.get('code_signatures', []):
self.regex['code_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE) self.err_regex['code_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
for sig in sigdef.get('gradle_signatures', []): for sig in sigdef.get('gradle_signatures', []):
self.regex['gradle_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE) self.err_regex['gradle_signatures'][sig] = re.compile('.*' + sig, re.IGNORECASE)
def clear_cache(self):
# delete cache folder and all its contents
shutil.rmtree(scanner._scanner_cachedir(), ignore_errors=True)
# re-initialize, this will re-populate the cache from default values
self.__init__()
# TODO: change this from global instance to dependency injection # TODO: change this from singleton instance to dependency injection
SCANNER_TOOL = None # use `_get_tool()` instead of accessing this directly
_SCANNER_TOOL = None
def _get_tool(): def _get_tool():
if not scanner.SCANNER_TOOL: '''
scanner.SCANNER_TOOL = ScannerTool() lazy loading factory for ScannerTool singleton
return scanner.SCANNER_TOOL
ScannerTool initialization need to access `common.config` values. Those are only available after initialization through `common.read_config()` So this factory assumes config was called at an erlier point in time
'''
if not scanner._SCANNER_TOOL:
scanner._SCANNER_TOOL = ScannerTool()
return scanner._SCANNER_TOOL
# taken from exodus_core # taken from exodus_core
@ -310,7 +344,7 @@ def scan_binary(apkfile, extract_signatures=None):
result = get_embedded_classes(apkfile) result = get_embedded_classes(apkfile)
problems = 0 problems = 0
for classname in result: for classname in result:
for suspect, regexp in _get_tool().regex['code_signatures'].items(): for suspect, regexp in _get_tool().err_regex['code_signatures'].items():
if regexp.match(classname): if regexp.match(classname):
logging.debug("Found class '%s'" % classname) logging.debug("Found class '%s'" % classname)
problems += 1 problems += 1
@ -362,7 +396,7 @@ def scan_source(build_dir, build=metadata.Build()):
return any(al in s for al in allowlisted) return any(al in s for al in allowlisted)
def suspects_found(s): def suspects_found(s):
for n, r in _get_tool().regex['gradle_signatures'].items(): for n, r in _get_tool().err_regex['gradle_signatures'].items():
if r.match(s) and not is_allowlisted(s): if r.match(s) and not is_allowlisted(s):
yield n yield n
@ -652,6 +686,8 @@ def main():
help=_("Force scan of disabled apps and builds.")) help=_("Force scan of disabled apps and builds."))
parser.add_argument("--json", action="store_true", default=False, parser.add_argument("--json", action="store_true", default=False,
help=_("Output JSON to stdout.")) help=_("Output JSON to stdout."))
parser.add_argument("--clear-cache", action="store_true", default=False,
help=_("purge local scanner definitions cache"))
metadata.add_metadata_arguments(parser) metadata.add_metadata_arguments(parser)
options = parser.parse_args() options = parser.parse_args()
metadata.warnings_action = options.W metadata.warnings_action = options.W
@ -665,6 +701,9 @@ def main():
config = common.read_config(options) config = common.read_config(options)
if options.clear_cache:
scanner._get_tool().clear_cache()
probcount = 0 probcount = 0
exodus = [] exodus = []

View file

@ -448,9 +448,9 @@ class Test_scan_binary(unittest.TestCase):
fdroidserver.common.config = config fdroidserver.common.config = config
fdroidserver.common.options = mock.Mock() fdroidserver.common.options = mock.Mock()
fdroidserver.scanner.SIGNATURE_TOOL = mock.Mock() fdroidserver.scanner._SCANNER_TOOL = mock.Mock()
fdroidserver.scanner.SIGNATURE_TOOL.regex = {} fdroidserver.scanner._SCANNER_TOOL.err_regex = {}
fdroidserver.scanner.SIGNATURE_TOOL.regex['code_signatures'] = { fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'] = {
"java/lang/Object": re.compile(r'.*java/lang/Object', re.IGNORECASE | re.UNICODE) "java/lang/Object": re.compile(r'.*java/lang/Object', re.IGNORECASE | re.UNICODE)
} }
@ -459,7 +459,7 @@ class Test_scan_binary(unittest.TestCase):
self.assertEqual( self.assertEqual(
1, 1,
fdroidserver.scanner.scan_binary(apkfile), fdroidserver.scanner.scan_binary(apkfile),
"Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner.SIGNATURE_TOOL.regex['code_signatures'].values(), apkfile), "Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'].values(), apkfile),
) )
@unittest.skipIf( @unittest.skipIf(
@ -470,7 +470,7 @@ class Test_scan_binary(unittest.TestCase):
) )
def test_bottom_level_embedded_apk_code_signature(self): def test_bottom_level_embedded_apk_code_signature(self):
apkfile = os.path.join(self.basedir, 'apk.embedded_1.apk') apkfile = os.path.join(self.basedir, 'apk.embedded_1.apk')
fdroidserver.scanner.SIGNATURE_TOOL.regex['code_signatures'] = { fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'] = {
"org/bitbucket/tickytacky/mirrormirror/MainActivity": re.compile( "org/bitbucket/tickytacky/mirrormirror/MainActivity": re.compile(
r'.*org/bitbucket/tickytacky/mirrormirror/MainActivity', re.IGNORECASE | re.UNICODE r'.*org/bitbucket/tickytacky/mirrormirror/MainActivity', re.IGNORECASE | re.UNICODE
) )
@ -479,12 +479,12 @@ class Test_scan_binary(unittest.TestCase):
self.assertEqual( self.assertEqual(
1, 1,
fdroidserver.scanner.scan_binary(apkfile), fdroidserver.scanner.scan_binary(apkfile),
"Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner.SIGNATURE_TOOL.regex['code_signatures'].values(), apkfile), "Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'].values(), apkfile),
) )
def test_top_level_signature_embedded_apk_present(self): def test_top_level_signature_embedded_apk_present(self):
apkfile = os.path.join(self.basedir, 'apk.embedded_1.apk') apkfile = os.path.join(self.basedir, 'apk.embedded_1.apk')
fdroidserver.scanner.SIGNATURE_TOOL.regex['code_signatures'] = { fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'] = {
"org/fdroid/ci/BuildConfig": re.compile( "org/fdroid/ci/BuildConfig": re.compile(
r'.*org/fdroid/ci/BuildConfig', re.IGNORECASE | re.UNICODE r'.*org/fdroid/ci/BuildConfig', re.IGNORECASE | re.UNICODE
) )
@ -492,7 +492,7 @@ class Test_scan_binary(unittest.TestCase):
self.assertEqual( self.assertEqual(
1, 1,
fdroidserver.scanner.scan_binary(apkfile), fdroidserver.scanner.scan_binary(apkfile),
"Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner.SIGNATURE_TOOL.regex['code_signatures'].values(), apkfile), "Did not find expected code signature '{}' in binary '{}'".format(fdroidserver.scanner._SCANNER_TOOL.err_regex['code_signatures'].values(), apkfile),
) )
# TODO: re-enable once allow-listing migrated to more complex regexes # TODO: re-enable once allow-listing migrated to more complex regexes
@ -640,24 +640,24 @@ class Test_SignatureDataController(unittest.TestCase):
def test_check_last_updated_exception_cache_outdated(self): def test_check_last_updated_exception_cache_outdated(self):
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml') sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml')
sdc.data['timestamp'] = (datetime.now().astimezone() - timedelta(days=30)).isoformat() sdc.data['timestamp'] = (datetime.now().astimezone() - timedelta(days=30)).isoformat()
with self.assertRaises(fdroidserver.scanner.SignatureCacheOutdatedException): with self.assertRaises(fdroidserver.scanner.SignatureDataOutdatedException):
sdc.check_last_updated() sdc.check_last_updated()
def test_check_last_updated_exception_missing_timestamp_value(self): def test_check_last_updated_exception_missing_timestamp_value(self):
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml') sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml')
with self.assertRaises(fdroidserver.scanner.SignatureCacheMalformedException): with self.assertRaises(fdroidserver.scanner.SignatureDataMalformedException):
sdc.check_last_updated() sdc.check_last_updated()
def test_check_last_updated_exception_not_string(self): def test_check_last_updated_exception_not_string(self):
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml') sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml')
sdc.data['timestamp'] = 12345 sdc.data['timestamp'] = 12345
with self.assertRaises(fdroidserver.scanner.SignatureCacheMalformedException): with self.assertRaises(fdroidserver.scanner.SignatureDataMalformedException):
sdc.check_last_updated() sdc.check_last_updated()
def test_check_last_updated_exception_not_iso_formatted_string(self): def test_check_last_updated_exception_not_iso_formatted_string(self):
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml') sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml')
sdc.data['timestamp'] = '01/09/2002 10:11' sdc.data['timestamp'] = '01/09/2002 10:11'
with self.assertRaises(fdroidserver.scanner.SignatureCacheMalformedException): with self.assertRaises(fdroidserver.scanner.SignatureDataMalformedException):
sdc.check_last_updated() sdc.check_last_updated()
# check_data_version # check_data_version
@ -668,7 +668,7 @@ class Test_SignatureDataController(unittest.TestCase):
def test_check_data_version_exception(self): def test_check_data_version_exception(self):
sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml') sdc = fdroidserver.scanner.SignatureDataController('nnn', 'fff.yml')
with self.assertRaises(fdroidserver.scanner.SignatureCacheMalformedException): with self.assertRaises(fdroidserver.scanner.SignatureDataVersionMismatchException):
sdc.check_data_version() sdc.check_data_version()