mirror of
https://github.com/f-droid/fdroidserver.git
synced 2025-11-05 15:00:30 +03:00
Add method for downloading (and verifying) a repository index
This includes some test cases to test the new code.
This commit is contained in:
parent
d8ad1c78c2
commit
a23da47118
5 changed files with 197 additions and 7 deletions
|
|
@ -28,11 +28,17 @@ import os
|
|||
import re
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import urllib.parse
|
||||
import zipfile
|
||||
from binascii import hexlify, unhexlify
|
||||
from datetime import datetime
|
||||
from xml.dom.minidom import Document
|
||||
|
||||
import requests
|
||||
from pyasn1.codec.der import decoder, encoder
|
||||
from pyasn1_modules import rfc2315
|
||||
|
||||
from fdroidserver import metadata, signindex, common
|
||||
from fdroidserver.common import FDroidPopen, FDroidPopenBytes
|
||||
from fdroidserver.metadata import MetaDataException
|
||||
|
|
@ -535,3 +541,103 @@ def get_raw_mirror(url):
|
|||
|
||||
url = "/".join(url)
|
||||
return url
|
||||
|
||||
|
||||
class VerificationException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def download_repo_index(url_str, verify_fingerprint=True):
|
||||
"""
|
||||
Downloads the repository index from the given :param url_str
|
||||
and verifies the repository's fingerprint if :param verify_fingerprint is not False.
|
||||
|
||||
:raises: VerificationException() if the repository could not be verified
|
||||
|
||||
:return: The index in JSON format.
|
||||
"""
|
||||
url = urllib.parse.urlsplit(url_str)
|
||||
|
||||
fingerprint = None
|
||||
if verify_fingerprint:
|
||||
query = urllib.parse.parse_qs(url.query)
|
||||
if 'fingerprint' not in query:
|
||||
raise VerificationException("No fingerprint in URL.")
|
||||
fingerprint = query['fingerprint'][0]
|
||||
|
||||
url = urllib.parse.SplitResult(url.scheme, url.netloc, url.path + '/index-v1.jar', '', '')
|
||||
r = requests.get(url.geturl())
|
||||
|
||||
with tempfile.NamedTemporaryFile() as fp:
|
||||
# write and open JAR file
|
||||
fp.write(r.content)
|
||||
jar = zipfile.ZipFile(fp)
|
||||
|
||||
# verify that the JAR signature is valid
|
||||
verify_jar_signature(fp.name)
|
||||
|
||||
# get public key and its fingerprint from JAR
|
||||
public_key, public_key_fingerprint = get_public_key_from_jar(jar)
|
||||
|
||||
# compare the fingerprint if verify_fingerprint is True
|
||||
if verify_fingerprint and fingerprint.upper() != public_key_fingerprint:
|
||||
raise VerificationException("The repository's fingerprint does not match.")
|
||||
|
||||
# load repository index from JSON
|
||||
index = json.loads(jar.read('index-v1.json').decode("utf-8"))
|
||||
index["repo"]["pubkey"] = hexlify(public_key).decode("utf-8")
|
||||
index["repo"]["fingerprint"] = public_key_fingerprint
|
||||
|
||||
# turn the apps into App objects
|
||||
index["apps"] = [metadata.App(app) for app in index["apps"]]
|
||||
|
||||
return index
|
||||
|
||||
|
||||
def verify_jar_signature(file):
|
||||
"""
|
||||
Verifies the signature of a given JAR file.
|
||||
|
||||
:raises: VerificationException() if the JAR's signature could not be verified
|
||||
"""
|
||||
if not common.verify_apk_signature(file, jar=True):
|
||||
raise VerificationException("The repository's index could not be verified.")
|
||||
|
||||
|
||||
def get_public_key_from_jar(jar):
|
||||
"""
|
||||
Get the public key and its fingerprint from a JAR file.
|
||||
|
||||
:raises: VerificationException() if the JAR was not signed exactly once
|
||||
|
||||
:param jar: a zipfile.ZipFile object
|
||||
:return: the public key from the jar and its fingerprint
|
||||
"""
|
||||
# extract certificate from jar
|
||||
certs = [n for n in jar.namelist() if common.CERT_PATH_REGEX.match(n)]
|
||||
if len(certs) < 1:
|
||||
raise VerificationException("Found no signing certificates for repository.")
|
||||
if len(certs) > 1:
|
||||
raise VerificationException("Found multiple signing certificates for repository.")
|
||||
|
||||
# extract public key from certificate
|
||||
public_key = get_public_key_from_certificate(jar.read(certs[0]))
|
||||
public_key_fingerprint = common.get_cert_fingerprint(public_key).replace(' ', '')
|
||||
|
||||
return public_key, public_key_fingerprint
|
||||
|
||||
|
||||
def get_public_key_from_certificate(certificate_file):
|
||||
"""
|
||||
Extracts a public key from the given certificate.
|
||||
:param certificate_file: file bytes (as string) representing the certificate
|
||||
:return: A binary representation of the certificate's public key
|
||||
"""
|
||||
content = decoder.decode(certificate_file, asn1Spec=rfc2315.ContentInfo())[0]
|
||||
if content.getComponentByName('contentType') != rfc2315.signedData:
|
||||
raise VerificationException("Unexpected certificate format.")
|
||||
content = decoder.decode(content.getComponentByName('content'),
|
||||
asn1Spec=rfc2315.SignedData())[0]
|
||||
certificates = content.getComponentByName('certificates')
|
||||
cert = certificates[0].getComponentByName('certificate')
|
||||
return encoder.encode(cert)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue