mirror of
				https://github.com/f-droid/fdroidserver.git
				synced 2025-11-04 14:30:30 +03:00 
			
		
		
		
	Merge branch 'fix-verify-reporting' into 'master'
verify: fix generating JSON reports, as used in verification.f-droid.org See merge request fdroid/fdroidserver!1168
This commit is contained in:
		
						commit
						30284ed31c
					
				
					 3 changed files with 173 additions and 57 deletions
				
			
		| 
						 | 
				
			
			@ -249,6 +249,7 @@ black:
 | 
			
		|||
        fdroidserver/readmeta.py
 | 
			
		||||
        fdroidserver/signindex.py
 | 
			
		||||
        fdroidserver/tail.py
 | 
			
		||||
        fdroidserver/verify.py
 | 
			
		||||
        setup.py
 | 
			
		||||
        tests/build.TestCase
 | 
			
		||||
        tests/deploy.TestCase
 | 
			
		||||
| 
						 | 
				
			
			@ -263,6 +264,7 @@ black:
 | 
			
		|||
        tests/ndk-release-checksums.py
 | 
			
		||||
        tests/rewritemeta.TestCase
 | 
			
		||||
        tests/signindex.TestCase
 | 
			
		||||
        tests/verify.TestCase
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
fedora_latest:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -34,39 +34,6 @@ options = None
 | 
			
		|||
config = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class hashabledict(OrderedDict):
 | 
			
		||||
    def __key(self):
 | 
			
		||||
        return tuple((k, self[k]) for k in sorted(self))
 | 
			
		||||
 | 
			
		||||
    def __hash__(self):
 | 
			
		||||
        try:
 | 
			
		||||
            return hash(self.__key())
 | 
			
		||||
        except TypeError as e:
 | 
			
		||||
            print(self.__key())
 | 
			
		||||
            raise e
 | 
			
		||||
 | 
			
		||||
    def __eq__(self, other):
 | 
			
		||||
        return self.__key() == other.__key()
 | 
			
		||||
 | 
			
		||||
    def __lt__(self, other):
 | 
			
		||||
        return self.__key() < other.__key()
 | 
			
		||||
 | 
			
		||||
    def __qt__(self, other):
 | 
			
		||||
        return self.__key() > other.__key()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Decoder(json.JSONDecoder):
 | 
			
		||||
    def __init__(self, **kwargs):
 | 
			
		||||
        json.JSONDecoder.__init__(self, **kwargs)
 | 
			
		||||
        self.parse_array = self.JSONArray
 | 
			
		||||
        # Use the python implemenation of the scanner
 | 
			
		||||
        self.scan_once = json.scanner.py_make_scanner(self)
 | 
			
		||||
 | 
			
		||||
    def JSONArray(self, s_and_end, scan_once, **kwargs):
 | 
			
		||||
        values, end = json.decoder.JSONArray(s_and_end, scan_once, **kwargs)
 | 
			
		||||
        return set(values), end
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _add_diffoscope_info(d):
 | 
			
		||||
    """Add diffoscope setup metadata to provided dict under 'diffoscope' key.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -76,23 +43,25 @@ def _add_diffoscope_info(d):
 | 
			
		|||
    """
 | 
			
		||||
    try:
 | 
			
		||||
        import diffoscope
 | 
			
		||||
        d['diffoscope'] = hashabledict()
 | 
			
		||||
 | 
			
		||||
        d['diffoscope'] = dict()
 | 
			
		||||
        d['diffoscope']['VERSION'] = diffoscope.VERSION
 | 
			
		||||
 | 
			
		||||
        from diffoscope.comparators import ComparatorManager
 | 
			
		||||
 | 
			
		||||
        ComparatorManager().reload()
 | 
			
		||||
 | 
			
		||||
        from diffoscope.tools import tool_check_installed, tool_required
 | 
			
		||||
 | 
			
		||||
        external_tools = sorted(tool_required.all)
 | 
			
		||||
        external_tools = [
 | 
			
		||||
            tool
 | 
			
		||||
            for tool in external_tools
 | 
			
		||||
            if not tool_check_installed(tool)
 | 
			
		||||
            tool for tool in external_tools if not tool_check_installed(tool)
 | 
			
		||||
        ]
 | 
			
		||||
        d['diffoscope']['External-Tools-Required'] = tuple(external_tools)
 | 
			
		||||
        d['diffoscope']['External-Tools-Required'] = external_tools
 | 
			
		||||
 | 
			
		||||
        from diffoscope.tools import OS_NAMES, get_current_os
 | 
			
		||||
        from diffoscope.external_tools import EXTERNAL_TOOLS
 | 
			
		||||
 | 
			
		||||
        current_os = get_current_os()
 | 
			
		||||
        os_list = [current_os] if (current_os in OS_NAMES) else iter(OS_NAMES)
 | 
			
		||||
        for os_ in os_list:
 | 
			
		||||
| 
						 | 
				
			
			@ -102,10 +71,12 @@ def _add_diffoscope_info(d):
 | 
			
		|||
                    tools.add(EXTERNAL_TOOLS[x][os_])
 | 
			
		||||
                except KeyError:
 | 
			
		||||
                    pass
 | 
			
		||||
            d['diffoscope']['Available-in-{}-packages'.format(OS_NAMES[os_])] = tuple(sorted(tools))
 | 
			
		||||
            tools = sorted(tools)
 | 
			
		||||
            d['diffoscope']['Available-in-{}-packages'.format(OS_NAMES[os_])] = tools
 | 
			
		||||
 | 
			
		||||
        from diffoscope.tools import python_module_missing
 | 
			
		||||
        d['diffoscope']['Missing-Python-Modules'] = tuple(sorted(python_module_missing.modules))
 | 
			
		||||
        from diffoscope.tools import python_module_missing as pmm
 | 
			
		||||
 | 
			
		||||
        d['diffoscope']['Missing-Python-Modules'] = sorted(pmm.modules)
 | 
			
		||||
    except ImportError:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -118,6 +89,9 @@ def write_json_report(url, remote_apk, unsigned_apk, compare_result):
 | 
			
		|||
    ensure that there is only one report per file, even when run
 | 
			
		||||
    repeatedly.
 | 
			
		||||
 | 
			
		||||
    The output is run through JSON to normalize things like tuples vs
 | 
			
		||||
    lists.
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    jsonfile = unsigned_apk + '.json'
 | 
			
		||||
    if os.path.exists(jsonfile):
 | 
			
		||||
| 
						 | 
				
			
			@ -125,22 +99,25 @@ def write_json_report(url, remote_apk, unsigned_apk, compare_result):
 | 
			
		|||
            data = json.load(fp, object_pairs_hook=OrderedDict)
 | 
			
		||||
    else:
 | 
			
		||||
        data = OrderedDict()
 | 
			
		||||
    output = hashabledict()
 | 
			
		||||
    output = dict()
 | 
			
		||||
    _add_diffoscope_info(output)
 | 
			
		||||
    output['url'] = url
 | 
			
		||||
    for key, filename in (('local', unsigned_apk), ('remote', remote_apk)):
 | 
			
		||||
        d = hashabledict()
 | 
			
		||||
        d = dict()
 | 
			
		||||
        output[key] = d
 | 
			
		||||
        d['file'] = filename
 | 
			
		||||
        d['sha256'] = common.sha256sum(filename)
 | 
			
		||||
        d['timestamp'] = os.stat(filename).st_ctime
 | 
			
		||||
        d['packageName'], d['versionCode'], d['versionName'] = common.get_apk_id(filename)
 | 
			
		||||
        d['packageName'], d['versionCode'], d['versionName'] = common.get_apk_id(
 | 
			
		||||
            filename
 | 
			
		||||
        )
 | 
			
		||||
    if compare_result:
 | 
			
		||||
        output['verified'] = False
 | 
			
		||||
        output['result'] = compare_result
 | 
			
		||||
    else:
 | 
			
		||||
        output['verified'] = True
 | 
			
		||||
    data[str(output['local']['timestamp'])] = output  # str makes better dict keys than float
 | 
			
		||||
    # str makes better dict keys than float
 | 
			
		||||
    data[str(output['local']['timestamp'])] = output
 | 
			
		||||
    with open(jsonfile, 'w') as fp:
 | 
			
		||||
        json.dump(data, fp, sort_keys=True)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -148,14 +125,22 @@ def write_json_report(url, remote_apk, unsigned_apk, compare_result):
 | 
			
		|||
        jsonfile = 'unsigned/verified.json'
 | 
			
		||||
        if os.path.exists(jsonfile):
 | 
			
		||||
            with open(jsonfile) as fp:
 | 
			
		||||
                data = json.load(fp, cls=Decoder, object_pairs_hook=hashabledict)
 | 
			
		||||
                data = json.load(fp)
 | 
			
		||||
        else:
 | 
			
		||||
            data = OrderedDict()
 | 
			
		||||
            data['packages'] = OrderedDict()
 | 
			
		||||
        packageName = output['local']['packageName']
 | 
			
		||||
 | 
			
		||||
        if packageName not in data['packages']:
 | 
			
		||||
            data['packages'][packageName] = set()
 | 
			
		||||
        data['packages'][packageName].add(output)
 | 
			
		||||
            data['packages'][packageName] = []
 | 
			
		||||
        found = False
 | 
			
		||||
        output_dump = json.dumps(output, sort_keys=True)
 | 
			
		||||
        for p in data['packages'][packageName]:
 | 
			
		||||
            if output_dump == json.dumps(p, sort_keys=True):
 | 
			
		||||
                found = True
 | 
			
		||||
                break
 | 
			
		||||
        if not found:
 | 
			
		||||
            data['packages'][packageName].insert(0, json.loads(output_dump))
 | 
			
		||||
        with open(jsonfile, 'w') as fp:
 | 
			
		||||
            json.dump(data, fp, cls=common.Encoder, sort_keys=True)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -165,13 +150,27 @@ def main():
 | 
			
		|||
    global options, config
 | 
			
		||||
 | 
			
		||||
    # Parse command line...
 | 
			
		||||
    parser = ArgumentParser(usage="%(prog)s [options] [APPID[:VERCODE] [APPID[:VERCODE] ...]]")
 | 
			
		||||
    parser = ArgumentParser(
 | 
			
		||||
        usage="%(prog)s [options] [APPID[:VERCODE] [APPID[:VERCODE] ...]]"
 | 
			
		||||
    )
 | 
			
		||||
    common.setup_global_opts(parser)
 | 
			
		||||
    parser.add_argument("appid", nargs='*', help=_("application ID with optional versionCode in the form APPID[:VERCODE]"))
 | 
			
		||||
    parser.add_argument("--reuse-remote-apk", action="store_true", default=False,
 | 
			
		||||
                        help=_("Verify against locally cached copy rather than redownloading."))
 | 
			
		||||
    parser.add_argument("--output-json", action="store_true", default=False,
 | 
			
		||||
                        help=_("Output JSON report to file named after APK."))
 | 
			
		||||
    parser.add_argument(
 | 
			
		||||
        "appid",
 | 
			
		||||
        nargs='*',
 | 
			
		||||
        help=_("application ID with optional versionCode in the form APPID[:VERCODE]"),
 | 
			
		||||
    )
 | 
			
		||||
    parser.add_argument(
 | 
			
		||||
        "--reuse-remote-apk",
 | 
			
		||||
        action="store_true",
 | 
			
		||||
        default=False,
 | 
			
		||||
        help=_("Verify against locally cached copy rather than redownloading."),
 | 
			
		||||
    )
 | 
			
		||||
    parser.add_argument(
 | 
			
		||||
        "--output-json",
 | 
			
		||||
        action="store_true",
 | 
			
		||||
        default=False,
 | 
			
		||||
        help=_("Output JSON report to file named after APK."),
 | 
			
		||||
    )
 | 
			
		||||
    options = parser.parse_args()
 | 
			
		||||
 | 
			
		||||
    config = common.read_config(options)
 | 
			
		||||
| 
						 | 
				
			
			@ -218,10 +217,15 @@ def main():
 | 
			
		|||
                    net.download_file(url, dldir=tmp_dir)
 | 
			
		||||
                except requests.exceptions.HTTPError:
 | 
			
		||||
                    try:
 | 
			
		||||
                        net.download_file(url.replace('/repo', '/archive'), dldir=tmp_dir)
 | 
			
		||||
                        net.download_file(
 | 
			
		||||
                            url.replace('/repo', '/archive'), dldir=tmp_dir
 | 
			
		||||
                        )
 | 
			
		||||
                    except requests.exceptions.HTTPError as e:
 | 
			
		||||
                        raise FDroidException(_('Downloading {url} failed. {error}')
 | 
			
		||||
                                              .format(url=url, error=e)) from e
 | 
			
		||||
                        raise FDroidException(
 | 
			
		||||
                            _('Downloading {url} failed. {error}').format(
 | 
			
		||||
                                url=url, error=e
 | 
			
		||||
                            )
 | 
			
		||||
                        ) from e
 | 
			
		||||
 | 
			
		||||
            unsigned_apk = os.path.join(unsigned_dir, apkfilename)
 | 
			
		||||
            compare_result = common.verify_apks(remote_apk, unsigned_apk, tmp_dir)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										110
									
								
								tests/verify.TestCase
									
										
									
									
									
										Executable file
									
								
							
							
						
						
									
										110
									
								
								tests/verify.TestCase
									
										
									
									
									
										Executable file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,110 @@
 | 
			
		|||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
import inspect
 | 
			
		||||
import json
 | 
			
		||||
import logging
 | 
			
		||||
import optparse
 | 
			
		||||
import os
 | 
			
		||||
import shutil
 | 
			
		||||
import sys
 | 
			
		||||
import tempfile
 | 
			
		||||
import unittest
 | 
			
		||||
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
from unittest.mock import patch
 | 
			
		||||
 | 
			
		||||
localmodule = os.path.realpath(
 | 
			
		||||
    os.path.join(os.path.dirname(inspect.getfile(inspect.currentframe())), '..')
 | 
			
		||||
)
 | 
			
		||||
print('localmodule: ' + localmodule)
 | 
			
		||||
if localmodule not in sys.path:
 | 
			
		||||
    sys.path.insert(0, localmodule)
 | 
			
		||||
 | 
			
		||||
from fdroidserver import common, verify
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
TEST_APP_ENTRY = {
 | 
			
		||||
    "1539780240.3885746": {
 | 
			
		||||
        "local": {
 | 
			
		||||
            "file": "unsigned/com.politedroid_6.apk",
 | 
			
		||||
            "packageName": "com.politedroid",
 | 
			
		||||
            "sha256": "70c2f776a2bac38a58a7d521f96ee0414c6f0fb1de973c3ca8b10862a009247d",
 | 
			
		||||
            "timestamp": 1234567.8900000,
 | 
			
		||||
            "versionCode": "6",
 | 
			
		||||
            "versionName": "1.5",
 | 
			
		||||
        },
 | 
			
		||||
        "remote": {
 | 
			
		||||
            "file": "tmp/com.politedroid_6.apk",
 | 
			
		||||
            "packageName": "com.politedroid",
 | 
			
		||||
            "sha256": "70c2f776a2bac38a58a7d521f96ee0414c6f0fb1de973c3ca8b10862a009247d",
 | 
			
		||||
            "timestamp": 1234567.8900000,
 | 
			
		||||
            "versionCode": "6",
 | 
			
		||||
            "versionName": "1.5",
 | 
			
		||||
        },
 | 
			
		||||
        "url": "https://f-droid.org/repo/com.politedroid_6.apk",
 | 
			
		||||
        "verified": True,
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class VerifyTest(unittest.TestCase):
 | 
			
		||||
 | 
			
		||||
    basedir = Path(__file__).resolve().parent
 | 
			
		||||
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        logging.basicConfig(level=logging.DEBUG)
 | 
			
		||||
        self.tempdir = tempfile.TemporaryDirectory()
 | 
			
		||||
        os.chdir(self.tempdir.name)
 | 
			
		||||
        self.repodir = Path('repo')
 | 
			
		||||
        self.repodir.mkdir()
 | 
			
		||||
 | 
			
		||||
    def tearDown(self):
 | 
			
		||||
        self.tempdir.cleanup()
 | 
			
		||||
 | 
			
		||||
    @patch('fdroidserver.common.sha256sum')
 | 
			
		||||
    def test_write_json_report(self, sha256sum):
 | 
			
		||||
        sha256sum.return_value = (
 | 
			
		||||
            '70c2f776a2bac38a58a7d521f96ee0414c6f0fb1de973c3ca8b10862a009247d'
 | 
			
		||||
        )
 | 
			
		||||
        os.mkdir('tmp')
 | 
			
		||||
        os.mkdir('unsigned')
 | 
			
		||||
        verified_json = Path('unsigned/verified.json')
 | 
			
		||||
        packageName = 'com.politedroid'
 | 
			
		||||
        apk_name = packageName + '_6.apk'
 | 
			
		||||
        remote_apk = 'tmp/' + apk_name
 | 
			
		||||
        unsigned_apk = 'unsigned/' + apk_name
 | 
			
		||||
        # TODO common.use apk_strip_v1_signatures() on unsigned_apk
 | 
			
		||||
        shutil.copy(str(self.basedir / 'repo' / apk_name), remote_apk)
 | 
			
		||||
        shutil.copy(str(self.basedir / 'repo' / apk_name), unsigned_apk)
 | 
			
		||||
        url = TEST_APP_ENTRY['1539780240.3885746']['url']
 | 
			
		||||
 | 
			
		||||
        self.assertFalse(verified_json.exists())
 | 
			
		||||
        verify.write_json_report(url, remote_apk, unsigned_apk, {})
 | 
			
		||||
        self.assertTrue(verified_json.exists())
 | 
			
		||||
        # smoke check status JSON
 | 
			
		||||
        with verified_json.open() as fp:
 | 
			
		||||
            firstpass = json.load(fp)
 | 
			
		||||
 | 
			
		||||
        verify.write_json_report(url, remote_apk, unsigned_apk, {})
 | 
			
		||||
        with verified_json.open() as fp:
 | 
			
		||||
            secondpass = json.load(fp)
 | 
			
		||||
 | 
			
		||||
        self.assertEqual(firstpass, secondpass)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    os.chdir(os.path.dirname(__file__))
 | 
			
		||||
 | 
			
		||||
    parser = optparse.OptionParser()
 | 
			
		||||
    parser.add_option(
 | 
			
		||||
        "-v",
 | 
			
		||||
        "--verbose",
 | 
			
		||||
        action="store_true",
 | 
			
		||||
        default=False,
 | 
			
		||||
        help="Spew out even more information than normal",
 | 
			
		||||
    )
 | 
			
		||||
    (common.options, args) = parser.parse_args(['--verbose'])
 | 
			
		||||
 | 
			
		||||
    newSuite = unittest.TestSuite()
 | 
			
		||||
    newSuite.addTest(unittest.makeSuite(VerifyTest))
 | 
			
		||||
    unittest.main(failfast=False)
 | 
			
		||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue