deploy: make androidobservatory and virustotal functions reusable

This should not change the logic at all, just make the loop runs into
standalone functions.
This commit is contained in:
Hans-Christoph Steiner 2020-03-03 14:38:14 +01:00
parent 733e7be1b3
commit b7901952a1
3 changed files with 134 additions and 105 deletions

1
.gitignore vendored
View file

@ -57,6 +57,7 @@ makebuildserver.config.py
/tests/repo/obb.mainpatch.current/en-US/icon_WI0pkO3LsklrsTAnRr-OQSxkkoMY41lYe2-fAvXLiLg=.png
/tests/repo/org.videolan.vlc/en-US/icon_yAfSvPRJukZzMMfUzvbYqwaD1XmHXNtiPBtuPVHW-6s=.png
/tests/urzip-πÇÇπÇÇ现代汉语通用字-български-عربي1234.apk
/tests/virustotal/
/unsigned/
# generated by gettext

View file

@ -19,6 +19,7 @@
import sys
import glob
import hashlib
import json
import os
import paramiko
import pwd
@ -447,9 +448,8 @@ def update_servergitmirrors(servergitmirrors, repo_section):
def upload_to_android_observatory(repo_section):
# depend on requests and lxml only if users enable AO
import requests
from lxml.html import fromstring
requests # stop unused import warning
if options.verbose:
logging.getLogger("requests").setLevel(logging.INFO)
@ -460,10 +460,19 @@ def upload_to_android_observatory(repo_section):
if repo_section == 'repo':
for f in sorted(glob.glob(os.path.join(repo_section, '*.apk'))):
fpath = f
fname = os.path.basename(f)
upload_apk_to_android_observatory(f)
def upload_apk_to_android_observatory(path):
# depend on requests and lxml only if users enable AO
import requests
from . import net
from lxml.html import fromstring
apkfilename = os.path.basename(path)
r = requests.post('https://androidobservatory.org/',
data={'q': update.sha256sum(f), 'searchby': 'hash'})
data={'q': update.sha256sum(path), 'searchby': 'hash'},
headers=net.HEADERS)
if r.status_code == 200:
# from now on XPath will be used to retrieve the message in the HTML
# androidobservatory doesn't have a nice API to talk with
@ -482,22 +491,22 @@ def upload_to_android_observatory(repo_section):
message = ''
if href:
message = (_('Found {apkfilename} at {url}')
.format(apkfilename=fname, url=(page + href)))
.format(apkfilename=apkfilename, url=(page + href)))
if message:
logging.debug(message)
continue
# upload the file with a post request
logging.info(_('Uploading {apkfilename} to androidobservatory.org')
.format(apkfilename=fname))
.format(apkfilename=apkfilename))
r = requests.post('https://androidobservatory.org/upload',
files={'apk': (fname, open(fpath, 'rb'))},
files={'apk': (apkfilename, open(path, 'rb'))},
headers=net.HEADERS,
allow_redirects=False)
def upload_to_virustotal(repo_section, virustotal_apikey):
import json
import requests
requests # stop unused import warning
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("requests").setLevel(logging.WARNING)
@ -514,22 +523,32 @@ def upload_to_virustotal(repo_section, virustotal_apikey):
for packageName, packages in data['packages'].items():
for package in packages:
upload_apk_to_virustotal(virustotal_apikey, **package)
def upload_apk_to_virustotal(virustotal_apikey, packageName, apkName, hash,
versionCode, **kwargs):
import requests
outputfilename = os.path.join('virustotal',
packageName + '_' + str(package.get('versionCode'))
+ '_' + package['hash'] + '.json')
packageName + '_' + str(versionCode)
+ '_' + hash + '.json')
if os.path.exists(outputfilename):
logging.debug(package['apkName'] + ' results are in ' + outputfilename)
continue
filename = package['apkName']
repofilename = os.path.join(repo_section, filename)
logging.debug(apkName + ' results are in ' + outputfilename)
return outputfilename
repofilename = os.path.join('repo', apkName)
logging.info('Checking if ' + repofilename + ' is on virustotal')
headers = {
"User-Agent": "F-Droid"
}
if 'headers' in kwargs:
for k, v in kwargs['headers'].items():
headers[k] = v
data = {
'apikey': virustotal_apikey,
'resource': package['hash'],
'resource': hash,
}
needs_file_upload = False
while True:
@ -540,10 +559,11 @@ def upload_to_virustotal(repo_section, virustotal_apikey):
if response['response_code'] == 0:
needs_file_upload = True
else:
response['filename'] = filename
response['filename'] = apkName
response['packageName'] = packageName
response['versionCode'] = package.get('versionCode')
response['versionName'] = package.get('versionName')
response['versionCode'] = versionCode
if kwargs.get('versionName'):
response['versionName'] = kwargs.get('versionName')
with open(outputfilename, 'w') as fp:
json.dump(response, fp, indent=2, sort_keys=True)
@ -582,7 +602,7 @@ def upload_to_virustotal(repo_section, virustotal_apikey):
logging.info(_('Uploading {apkfilename} to virustotal')
.format(apkfilename=repofilename))
files = {
'file': (filename, open(repofilename, 'rb'))
'file': (apkName, open(repofilename, 'rb'))
}
r = requests.post(upload_url, data=data, headers=headers, files=files)
logging.debug(_('If this upload fails, try manually uploading to {url}')
@ -591,6 +611,8 @@ def upload_to_virustotal(repo_section, virustotal_apikey):
response = r.json()
logging.info(response['verbose_msg'] + " " + response['permalink'])
return outputfilename
def push_binary_transparency(git_repo_path, git_remote):
'''push the binary transparency git repo to the specifed remote.

View file

@ -142,6 +142,12 @@ class ServerTest(unittest.TestCase):
repo_section)
self.assertEqual(call_iteration, 2, 'expected 2 invocations of subprocess.call')
@unittest.skipIf(not os.getenv('VIRUSTOTAL_API_KEY'), 'VIRUSTOTAL_API_KEY is not set')
def test_upload_to_virustotal(self):
fdroidserver.server.options.verbose = True
virustotal_apikey = os.getenv('VIRUSTOTAL_API_KEY')
fdroidserver.server.upload_to_virustotal('repo', virustotal_apikey)
if __name__ == "__main__":
os.chdir(os.path.dirname(__file__))