basic downloading for scan_binary signatures

This commit is contained in:
Michael Pöhn 2022-09-22 01:33:23 +02:00
parent 82355b8559
commit f56b1f3012
5 changed files with 351 additions and 62 deletions

View file

@ -23,19 +23,23 @@ import re
import sys
import traceback
import zipfile
import yaml
from argparse import ArgumentParser
from collections import namedtuple
from copy import deepcopy
from tempfile import TemporaryDirectory
from pathlib import Path
import logging
import itertools
import urllib.request
from datetime import datetime, timedelta
import requests
from . import _
from . import common
from . import metadata
from .exception import BuildException, VCSException
from .exception import BuildException, VCSException, ConfigurationException
from . import scanner
config = None
@ -47,17 +51,6 @@ json_per_build = deepcopy(DEFAULT_JSON_PER_BUILD)
MAVEN_URL_REGEX = re.compile(r"""\smaven\s*(?:{.*?(?:setUrl|url)|\((?:url)?)\s*=?\s*(?:uri)?\(?\s*["']?([^\s"']+)["']?[^})]*[)}]""",
re.DOTALL)
CODE_SIGNATURES = {
exp: re.compile(r'.*' + exp, re.IGNORECASE) for exp in [
r'com/google/firebase',
r'com/google/android/gms',
r'com/google/android/play/core',
r'com/google/tagmanager',
r'com/google/analytics',
r'com/android/billing',
]
}
# Common known non-free blobs (always lower case):
NON_FREE_GRADLE_LINES = {
exp: re.compile(r'.*' + exp, re.IGNORECASE) for exp in [
@ -109,6 +102,9 @@ NON_FREE_GRADLE_LINES = {
}
SCANNER_CACHE_VERSION = 1
def get_gradle_compile_commands(build):
compileCommands = ['compile',
'provided',
@ -188,6 +184,145 @@ def _exodus_compile_signatures(signatures):
return compiled_tracker_signature
def _datetime_now():
"""
simple warpper for datetime.now to allow mocking it for testing
"""
return datetime.now().astimezone()
def _scanner_cachedir():
"""
get `Path` to local cache dir
"""
if not common.config or "cachedir_scanner" not in common.config:
raise ConfigurationException("could not load 'cachedir_scanner' config")
cachedir = Path(config["cachedir_scanner"])
cachedir.mkdir(exist_ok=True, parents=True)
return cachedir
class SignatureCacheMalformedException(Exception):
pass
class SignatureCacheOutdatedException(Exception):
pass
class SignatureDataController:
def __init__(self, name, filename):
self.name = name
self.filename = filename
self.cache_outdated_interval = timedelta(days=7)
self.data = {}
def check_data_version(self):
if self.data.get("version") != SCANNER_CACHE_VERSION:
raise SignatureCacheMalformedException()
def check_last_updated(self):
timestamp = self.data.get("timestamp")
if not timestamp:
raise SignatureCacheMalformedException()
try:
timestamp = datetime.fromisoformat(timestamp)
except ValueError as e:
raise SignatureCacheMalformedException() from e
except TypeError as e:
raise SignatureCacheMalformedException() from e
if (timestamp + self.cache_outdated_interval) < scanner._datetime_now():
raise SignatureCacheOutdatedException()
def load_from_defaults(self):
sig_file = Path(__file__).absolute().parent / 'scanner_signatures' / self.file_name
with open(sig_file) as f:
self.data = yaml.safe_load(f)
def load_from_cache(self):
sig_file = scanner._scanner_cachedir() / self.filename
if not sig_file.exists():
raise SignatureCacheMalformedException()
with open(sig_file) as f:
self.data = yaml.safe_load(f)
def write_to_cache(self):
sig_file = scanner._scanner_cachedir() / self.filename
with open(sig_file, "w", encoding="utf-8") as f:
yaml.safe_dump(self.data, f)
logging.debug("write '{}' to cache".format(self.filename))
def verify_data(self):
valid_keys = ['timestamp', 'version', 'signatures']
for k in [x for x in self.data.keys() if x not in valid_keys]:
del self.data[k]
# def scan
class ExodusSignatureDataController(SignatureDataController):
def __init__(self):
super().__init__('Exodus signatures', 'exodus.yml')
def fetch_signatures_from_web():
pass
# TODO
# exodus_url = "https://reports.exodus-privacy.eu.org/api/trackers"
# sigs = {
# "signatures": [],
# "timestamp": scanner._datetime_now().isoformat(),
# "version": SCANNER_CACHE_VERSION,
# }
# with urllib.request.urlopen(exodus_url) as f:
# data = json.load(f)
# for tracker in data["trackers"].values():
# sigs["signatures"].append({
# "name": tracker["name"],
# "binary_signature": tracker["code_signature"],
# "network_signature": tracker["network_signature"],
# "types": ["tracker", "non-free"] # right now we assume all trackers in exodus are non-free
# })
class ScannerSignatureDataController(SignatureDataController):
def __init__(self):
super().__init__('Scanner signatures', 'scanner.yml')
def fetch_signatures_from_web(self):
url = "https://uniqx.gitlab.io/fdroid-scanner-signatures/sigs.json"
with urllib.request.urlopen(url) as f:
data = yaml.safe_load(f)
# TODO: validate parsed data
# TODO: error message 'please update fdroidserver/report' when fetching failed due to changes in the data strucutre
self.data = data
class SignatureTool():
def __init__(self):
self.sdcs = [ScannerSignatureDataController()]
for sdc in self.sdcs:
sdc.fetch_signatures_from_web()
# TODO: use cache
# if not sdc.check_cache():
# sdc.load_from_defaults()
self.compile_regexes()
def compile_regexes(self):
self.regex = {'code_signatures': {}}
for sdc in self.sdcs:
for lname, ldef in sdc.data.get('signatures', []).items():
self.regex['code_signatures'].update({(x, re.compile(x)) for x in ldef.get('code_signatures', [])})
def binary_signatures(self):
for sdc in self.sdcs:
for sig in sdc.binary_signatures():
yield sig
SIGNATURE_TOOL = SignatureTool()
# taken from exodus_core
def load_exodus_trackers_signatures():
"""
@ -215,7 +350,7 @@ def scan_binary(apkfile, extract_signatures=None):
result = get_embedded_classes(apkfile)
problems = 0
for classname in result:
for suspect, regexp in CODE_SIGNATURES.items():
for suspect, regexp in SIGNATURE_TOOL.regex['code_signatures'].items():
if regexp.match(classname):
logging.debug("Found class '%s'" % classname)
problems += 1