Source code for spkrepo.utils

# -*- coding: utf-8 -*-
import base64
import binascii
import hashlib
import io
import json
import re
import tarfile
import time
from configparser import ConfigParser

import gnupg
import requests

from .exceptions import SPKParseError, SPKSignError


[docs]class SPK(object): """SPK utilities :param fileobj stream: SPK file stream """ #: Required keys in the INFO file REQUIRED_INFO = {'package', 'version', 'arch', 'firmware', 'displayname', 'description'} #: Boolean INFO keys BOOLEAN_INFO = set(['startable', 'support_conf_folder']) #: Signature filename SIGNATURE_FILENAME = 'syno_signature.asc' #: Regex for a line of the INFO file info_line_re = re.compile(r'^(?P<key>\w+)="(?P<value>.*)"$', re.MULTILINE) #: Regex for package in INFO file package_re = re.compile(r'^[\w-]+$') #: Regex for a wizard filename wizard_filename_re = re.compile(r'^WIZARD_UIFILES/(?P<process>install|upgrade|uninstall)_uifile(?:_[a-z]{3})?$') #: Regex for icons in INFO icon_info_re = re.compile(r'^package_icon(?:_(?P<size>120|256))?$') #: Regex for icons in files icon_filename_re = re.compile(r'^PACKAGE_ICON(?:_(?P<size>120|256))?\.PNG$') #: Regex for files in scripts script_filename_re = re.compile(r'^scripts/.+$') #: Regex for files in conf conf_filename_re = re.compile(r'^conf/.+$') def __init__(self, stream): self.info = {} self.icons = {} self.wizards = set() self.license = None self.signature = None self.stream = stream self.conf_dependencies = None self.conf_conflicts = None self.stream.seek(0) try: with tarfile.open(fileobj=self.stream, mode='r:') as spk: names = spk.getnames() # check for required files if 'INFO' not in names: raise SPKParseError('Missing INFO file') if 'package.tgz' not in names: raise SPKParseError('Missing package.tgz file') # read LICENSE file if 'LICENSE' in names: # decode utf-8 try: self.license = spk.extractfile('LICENSE').read().decode('utf-8').strip() except UnicodeDecodeError: raise SPKParseError('Wrong LICENSE encoding') # read syno_signature.asc file if 'syno_signature.asc' in names: # decode ascii try: self.signature = spk.extractfile('syno_signature.asc').read().decode('ascii').strip() except UnicodeDecodeError: raise SPKParseError('Wrong syno_signature.asc encoding') # read INFO lines for line in spk.extractfile('INFO').readlines(): # decode utf-8 try: line = line.decode('utf-8').strip() except UnicodeDecodeError: raise SPKParseError('Wrong INFO encoding') # skip blank line if not line: continue # validate line match = self.info_line_re.match(line) if not match: raise SPKParseError('Invalid INFO') key, value = match.group('key'), match.group('value') # read icons match = self.icon_info_re.match(key) if match: size = match.group('size') or '72' try: self.icons[size] = io.BytesIO(base64.b64decode(value.encode('utf-8'))) except binascii.Error: raise SPKParseError('Invalid INFO icon: %s' % key) # read booleans elif key in self.BOOLEAN_INFO: if value == 'yes': self.info[key] = True elif value == 'no': self.info[key] = False else: raise SPKParseError('Invalid INFO boolean: %s' % key) elif key == 'package': match = self.package_re.match(value) if not match: raise SPKParseError('Invalid INFO package') self.info[key] = value else: self.info[key] = value # validate info if not set(self.info.keys()) >= self.REQUIRED_INFO: raise SPKParseError('Missing INFO: %s' % ', '.join(self.REQUIRED_INFO - set(self.info.keys()))) # read conf files if 'support_conf_folder' in self.info and self.info['support_conf_folder']: if 'conf' not in names: raise SPKParseError('Missing conf folder') if 'conf/PKG_DEPS' in names: c = ConfigParser() try: c.read_string(spk.extractfile('conf/PKG_DEPS').read().decode('utf-8')) except UnicodeDecodeError: raise SPKParseError('Wrong conf/PKG_DEPS encoding') self.conf_dependencies = json.dumps({s: {k: v for k, v in c.items(s)} for s in c.sections()}) if 'conf/PKG_CONX' in names: c = ConfigParser() try: c.read_string(spk.extractfile('conf/PKG_CONX').read().decode('utf-8')) except UnicodeDecodeError: raise SPKParseError('Wrong conf/PKG_CONX encoding') self.conf_conflicts = json.dumps({s: {k: v for k, v in c.items(s)} for s in c.sections()}) if self.conf_dependencies is None and self.conf_conflicts is None: raise SPKParseError('Empty conf folder') # verify checksum if 'checksum' in self.info: checksum = hashlib.md5() archive = spk.extractfile('package.tgz') for chunk in iter(lambda: archive.read(io.DEFAULT_BUFFER_SIZE), b''): checksum.update(chunk) if checksum.hexdigest() != self.info['checksum']: raise SPKParseError('Checksum mismatch') # read icon files for name in names: match = self.icon_filename_re.match(name) if match: self.icons[match.group('size') or '72'] = io.BytesIO(spk.extractfile(name).read()) # validate icons if '72' not in self.icons: raise SPKParseError('Missing 72px icon') # read wizard files if 'WIZARD_UIFILES' in names: for name in names: match = self.wizard_filename_re.match(name) if match: self.wizards.add(match.group('process')) except tarfile.TarError: raise SPKParseError('Invalid SPK') self.stream.seek(0)
[docs] def sign(self, timestamp_url, gnupghome): """ Sign the package :param timestamp_url: url for the remote timestamping :param gnupghome: path to the gnupg home """ # check no signature exists if self.signature is not None: raise ValueError('Already signed') # collect the streams with io.BytesIO() as data_stream: self.stream.seek(0) with tarfile.open(fileobj=self.stream, mode='r:') as spk: names = sorted(spk.getnames()) # INFO if 'INFO' in names: data_stream.write(spk.extractfile('INFO').read()) # LICENSE if 'LICENSE' in names: data_stream.write(spk.extractfile('LICENSE').read()) # icons for name in names: match = self.icon_filename_re.match(name) if match: data_stream.write(spk.extractfile(name).read()) # wizards for name in names: match = self.wizard_filename_re.match(name) if match: data_stream.write(spk.extractfile(name).read()) # conf for name in names: match = self.conf_filename_re.match(name) if match: data_stream.write(spk.extractfile(name).read()) # package.tgz if 'package.tgz' in names: data_stream.write(spk.extractfile('package.tgz').read()) # scripts for name in names: match = self.script_filename_re.match(name) if match: data_stream.write(spk.extractfile(name).read()) # generate the signature data_stream.seek(0) signature = self._generate_signature(data_stream, timestamp_url, gnupghome) self.signature = signature # add the signature to the SPK signature_stream = io.BytesIO(signature.encode('ascii')) signature_tarinfo = tarfile.TarInfo(self.SIGNATURE_FILENAME) signature_tarinfo.mtime = time.time() signature_stream.seek(0, io.SEEK_END) signature_tarinfo.size = signature_stream.tell() signature_stream.seek(0) self.stream.seek(0) with tarfile.open(fileobj=self.stream, mode='a:') as spk: spk.addfile(tarinfo=signature_tarinfo, fileobj=signature_stream) self.stream.seek(0)
[docs] def unsign(self): """Remove the signature file of the package""" # check signature exists if self.signature is None: raise ValueError('Not signed') # remove the signature file with io.BytesIO() as unsigned_stream: self.stream.seek(0) with tarfile.open(fileobj=self.stream, mode='r:') as spk: with tarfile.open(fileobj=unsigned_stream, mode='w:') as unsigned_spk: for member in spk.getmembers(): if member.name == self.SIGNATURE_FILENAME: continue unsigned_spk.addfile(member, spk.extractfile(member)) unsigned_stream.seek(0) self.stream.seek(0) self.stream.write(unsigned_stream.read()) self.stream.truncate() self.stream.seek(0)
def _generate_signature(self, stream, timestamp_url, gnupghome): # pragma: no cover # generate the signature gpg = gnupg.GPG(gnupghome=gnupghome) signature = gpg.sign_file(stream, detach=True) # have the signature remotely timestamped try: response = requests.post(timestamp_url, files={'file': signature.data}, timeout=2) except requests.Timeout: raise SPKSignError('Timestamp server did not respond in time') # check the response status if response.status_code != 200: raise SPKSignError('Timestamp server returned with status code %d' % response.status_code) # verify the timestamp if not gpg.verify(response.content): raise SPKSignError('Cannot verify timestamp') response.encoding = 'ascii' return response.text