Source code for spkrepo.models

# -*- coding: utf-8 -*-
import io
import os
import shutil
from datetime import datetime, timedelta

from flask import current_app
from flask.ext.security import UserMixin, RoleMixin, SQLAlchemyUserDatastore
from flask.ext.sqlalchemy import before_models_committed, models_committed
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm.collections import attribute_mapped_collection

from .ext import db


user_role = db.Table('user_role',
                     db.Column('user_id', db.Integer(), db.ForeignKey('user.id')),
                     db.Column('role_id', db.Integer(), db.ForeignKey('role.id')))

package_user_maintainer = db.Table('package_user_maintainer',
                                   db.Column('package_id', db.Integer(), db.ForeignKey('package.id')),
                                   db.Column('user_id', db.Integer(), db.ForeignKey('user.id')))


[docs]class User(db.Model, UserMixin): __tablename__ = 'user' # Columns id = db.Column(db.Integer, primary_key=True) username = db.Column(db.Unicode(50), unique=True, nullable=False) email = db.Column(db.Unicode(254), unique=True, nullable=False) password = db.Column(db.Unicode(255), nullable=False) api_key = db.Column(db.Unicode(64), unique=True) github_access_token = db.Column(db.Unicode(255)) active = db.Column(db.Boolean(), nullable=False) confirmed_at = db.Column(db.DateTime()) # Relationhips roles = db.relationship('Role', secondary='user_role', back_populates='users', lazy=False) authored_packages = db.relationship('Package', back_populates='author') maintained_packages = db.relationship('Package', secondary='package_user_maintainer', back_populates='maintainers') def __str__(self): return self.username def __repr__(self): return '<{} {}>'.format(self.__class__.__name__, self.username)
[docs]class Role(db.Model, RoleMixin): __tablename__ = 'role' # Columns id = db.Column(db.Integer(), primary_key=True) name = db.Column(db.Unicode(50), unique=True, nullable=False) description = db.Column(db.Unicode(255)) # Relationhips users = db.relationship('User', secondary='user_role', back_populates='roles') @classmethod
[docs] def find(cls, name): return cls.query.filter(cls.name == name).first()
def __str__(self): return self.name def __repr__(self): return '<{} {}>'.format(self.__class__.__name__, self.name)
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
[docs]class Architecture(db.Model): __tablename__ = 'architecture' # Columns id = db.Column(db.Integer, primary_key=True) code = db.Column(db.Unicode(20), unique=True, nullable=False) # Relationhips builds = db.relationship('Build', secondary='build_architecture', back_populates='architectures') # Other from_syno = {'88f6281': '88f628x', '88f6282': '88f628x'} to_syno = {'88f628x': '88f6281'} @classmethod
[docs] def find(cls, code, syno=False): if syno: return cls.query.filter(cls.code == cls.from_syno.get(code, code)).first() return cls.query.filter(cls.code == code).first()
def __str__(self): return self.code def __repr__(self): return '<{} {}>'.format(self.__class__.__name__, self.code)
[docs]class Language(db.Model): __tablename__ = 'language' # Columns id = db.Column(db.Integer, primary_key=True) code = db.Column(db.Unicode(3), unique=True, nullable=False) name = db.Column(db.Unicode(50)) @classmethod
[docs] def find(cls, code): return cls.query.filter(cls.code == code).first()
def __str__(self): return self.name def __repr__(self): return '<{} [{}] {}>'.format(self.__class__.__name__, self.code, self.name)
[docs]class Firmware(db.Model): __tablename__ = 'firmware' # Columns id = db.Column(db.Integer, primary_key=True) version = db.Column(db.Unicode(3), nullable=False) build = db.Column(db.Integer, unique=True, nullable=False) @classmethod
[docs] def find(cls, build): return cls.query.filter(cls.build == build).first()
@property def firmware_string(self): return '%s-%d' % (self.version, self.build) def __str__(self): return self.firmware_string def __repr__(self): return '<{} {}>'.format(self.__class__.__name__, self.firmware_string)
[docs]class Screenshot(db.Model): __tablename__ = 'screenshot' # Columns id = db.Column(db.Integer, primary_key=True) package_id = db.Column(db.Integer, db.ForeignKey('package.id'), nullable=False) path = db.Column(db.Unicode(100), nullable=False) # Relationhips package = db.relationship('Package', back_populates='screenshots')
[docs] def save(self, stream): with io.open(os.path.join(current_app.config['DATA_PATH'], self.path), 'wb') as f: f.write(stream.read())
def _after_insert(self): assert os.path.exists(os.path.join(current_app.config['DATA_PATH'], self.path)) def _after_delete(self): path = os.path.join(current_app.config['DATA_PATH'], self.path) if os.path.exists(path): os.remove(path) def __str__(self): return self.path def __repr__(self): return '<{} {}>'.format(self.__class__.__name__, self.path)
[docs]class Icon(db.Model): __tablename__ = 'icon' # Columns id = db.Column(db.Integer, primary_key=True) version_id = db.Column(db.Integer, db.ForeignKey('version.id'), nullable=False) size = db.Column(db.Enum('72', '120', '256', name='icon_size'), nullable=False) path = db.Column(db.Unicode(100), nullable=False) # Constraints __table_args__ = (db.UniqueConstraint(version_id, size),) # Relationhips version = db.relationship('Version', back_populates='icons')
[docs] def save(self, stream): with io.open(os.path.join(current_app.config['DATA_PATH'], self.path), 'wb') as f: f.write(stream.read())
def _after_insert(self): assert os.path.exists(os.path.join(current_app.config['DATA_PATH'], self.path)) def _after_delete(self): path = os.path.join(current_app.config['DATA_PATH'], self.path) if os.path.exists(path): os.remove(path) def __str__(self): return self.path def __repr__(self): return '<{} {}>'.format(self.__class__.__name__, self.path)
[docs]class Service(db.Model): __tablename__ = 'service' # Columns id = db.Column(db.Integer, primary_key=True) code = db.Column(db.Unicode(30), unique=True, nullable=False) @classmethod
[docs] def find(cls, code): return cls.query.filter(cls.code == code).first()
def __str__(self): return self.code def __repr__(self): return '<{} {}>'.format(self.__class__.__name__, self.code)
[docs]class DisplayName(db.Model): __tablename__ = 'displayname' # Columns version_id = db.Column(db.Integer, db.ForeignKey('version.id'), nullable=False) language_id = db.Column(db.Integer, db.ForeignKey('language.id'), nullable=False) displayname = db.Column(db.Unicode(50), nullable=False) # Relationhips language = db.relationship('Language') # Constraints __table_args__ = (db.PrimaryKeyConstraint(version_id, language_id),) def __str__(self): return self.displayname def __repr__(self): return '<{} {}>'.format(self.__class__.__name__, self.language.name)
[docs]class Description(db.Model): __tablename__ = 'description' # Columns version_id = db.Column(db.Integer, db.ForeignKey('version.id'), nullable=False) language_id = db.Column(db.Integer, db.ForeignKey('language.id'), nullable=False) description = db.Column(db.UnicodeText, nullable=False) # Relationhips language = db.relationship('Language') # Constraints __table_args__ = (db.PrimaryKeyConstraint(version_id, language_id),) def __str__(self): return self.description def __repr__(self): return '<{} {}>'.format(self.__class__.__name__, self.language.name)
version_service_dependency = db.Table('version_service_dependency', db.Column('version_id', db.Integer(), db.ForeignKey('version.id')), db.Column('service_id', db.Integer(), db.ForeignKey('service.id')))
[docs]class Download(db.Model): __tablename__ = 'download' # Columns id = db.Column(db.Integer, primary_key=True) build_id = db.Column(db.Integer, db.ForeignKey('build.id'), nullable=False) architecture_id = db.Column(db.Integer, db.ForeignKey('architecture.id'), nullable=False) firmware_build = db.Column(db.Integer, nullable=False) ip_address = db.Column(db.Unicode(46), nullable=False) user_agent = db.Column(db.Unicode(255)) date = db.Column(db.DateTime, default=db.func.now(), nullable=False) # Relationships build = db.relationship('Build', back_populates='downloads') architecture = db.relationship('Architecture') def __str__(self): return self.ip_address def __repr__(self): return '<{} {}>'.format(self.__class__.__name__, self.ip_address)
build_architecture = db.Table('build_architecture', db.Column('build_id', db.Integer(), db.ForeignKey('build.id')), db.Column('architecture_id', db.Integer(), db.ForeignKey('architecture.id')))
[docs]class Build(db.Model): __tablename__ = 'build' # Columns id = db.Column(db.Integer, primary_key=True) version_id = db.Column(db.Integer, db.ForeignKey('version.id'), nullable=False) firmware_id = db.Column(db.Integer, db.ForeignKey('firmware.id'), nullable=False) publisher_user_id = db.Column(db.Integer, db.ForeignKey('user.id')) checksum = db.Column(db.Unicode(32)) extract_size = db.Column(db.Integer) path = db.Column(db.Unicode(100)) md5 = db.Column(db.Unicode(32)) insert_date = db.Column(db.DateTime, default=db.func.now(), nullable=False) active = db.Column(db.Boolean(), default=False, nullable=False) # Relationships version = db.relationship('Version', back_populates='builds', lazy=False) architectures = db.relationship('Architecture', secondary='build_architecture', lazy=False) firmware = db.relationship('Firmware', lazy=False) publisher = db.relationship('User', foreign_keys=[publisher_user_id]) downloads = db.relationship('Download', back_populates='build') @classmethod
[docs] def generate_filename(cls, package, version, firmware, architectures): return '%s.v%d.f%d[%s].spk' % (package.name, version.version, firmware.build, '-'.join(a.code for a in architectures))
[docs] def save(self, stream): with io.open(os.path.join(current_app.config['DATA_PATH'], self.path), 'wb') as f: f.write(stream.read())
def _after_insert(self): assert os.path.exists(os.path.join(current_app.config['DATA_PATH'], self.path)) def _after_delete(self): path = os.path.join(current_app.config['DATA_PATH'], self.path) if os.path.exists(path): os.remove(path) def __str__(self): return self.path def __repr__(self): return '<{} {}>'.format(self.__class__.__name__, self.path)
[docs]class Version(db.Model): __tablename__ = 'version' # Columns id = db.Column(db.Integer, primary_key=True) package_id = db.Column(db.Integer, db.ForeignKey('package.id'), nullable=False) version = db.Column(db.Integer, nullable=False, index=True) upstream_version = db.Column(db.Unicode(20), nullable=False) changelog = db.Column(db.UnicodeText) report_url = db.Column(db.Unicode(255)) distributor = db.Column(db.Unicode(50)) distributor_url = db.Column(db.Unicode(255)) maintainer = db.Column(db.Unicode(50)) maintainer_url = db.Column(db.Unicode(255)) dependencies = db.Column(db.Unicode(255)) conf_dependencies = db.Column(db.Unicode(255)) conflicts = db.Column(db.Unicode(255)) conf_conflicts = db.Column(db.Unicode(255)) install_wizard = db.Column(db.Boolean) upgrade_wizard = db.Column(db.Boolean) startable = db.Column(db.Boolean) license = db.Column(db.UnicodeText) insert_date = db.Column(db.DateTime, default=db.func.now(), nullable=False) # Relationhips package = db.relationship('Package', back_populates='versions', lazy=False) service_dependencies = db.relationship('Service', secondary='version_service_dependency') displaynames = db.relationship('DisplayName', cascade='all, delete-orphan', collection_class=attribute_mapped_collection('language.code')) descriptions = db.relationship('Description', cascade='all, delete-orphan', collection_class=attribute_mapped_collection('language.code')) icons = db.relationship('Icon', back_populates='version', cascade='all, delete-orphan', collection_class=attribute_mapped_collection('size')) builds = db.relationship('Build', back_populates='version', cascade='all, delete-orphan', cascade_backrefs=False) # Constraints __table_args__ = (db.UniqueConstraint(package_id, version),) @hybrid_property def version_string(self): return self.upstream_version + '-' + str(self.version) @hybrid_property def beta(self): return self.report_url != None @hybrid_property def all_builds_active(self): return all(b.active for b in self.builds) @all_builds_active.expression def all_builds_active(cls): return (db.select([db.func.count()]). where(db.and_(Build.version_id == cls.id, Build.active)). label('active_builds')) == \ (db.select([db.func.count()]). where(Build.version_id == cls.id). label('total_builds')) @property def path(self): return os.path.join(self.package.name, str(self.version)) def _after_insert(self): assert os.path.exists(os.path.join(current_app.config['DATA_PATH'], self.path)) def _after_delete(self): path = os.path.join(current_app.config['DATA_PATH'], self.path) if os.path.exists(path): shutil.rmtree(path) def __str__(self): return self.version_string def __repr__(self): return '<{} {}>'.format(self.__class__.__name__, self.version_string)
[docs]class Package(db.Model): __tablename__ = 'package' # Columns id = db.Column(db.Integer, primary_key=True) author_user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='SET NULL')) name = db.Column(db.Unicode(50), nullable=False) insert_date = db.Column(db.DateTime, default=db.func.now(), nullable=False) download_count = db.column_property(db.select([db.func.count(Download.id)]). select_from(Download.__table__.join(Build).join(Version)). where(Version.package_id == id), deferred=True) recent_download_count = db.column_property(db.select([db.func.count(Download.id)]). select_from(Download.__table__.join(Build).join(Version)). where(db.and_(Version.package_id == id, Download.date >= datetime.now() - timedelta(days=90))). correlate_except(Download), deferred=True) # Relationhips versions = db.relationship('Version', back_populates='package', cascade='all, delete-orphan', cascade_backrefs=False, order_by='Version.version') screenshots = db.relationship('Screenshot', back_populates='package', cascade='all, delete-orphan') author = db.relationship('User', back_populates='authored_packages') maintainers = db.relationship('User', secondary='package_user_maintainer', back_populates='maintained_packages') # Constraints __table_args__ = (db.UniqueConstraint(name),) @classmethod
[docs] def find(cls, name): return cls.query.filter(cls.name == name).first()
def __str__(self): return self.name def __repr__(self): return '<{} {}>'.format(self.__class__.__name__, self.name)
@models_committed.connect def on_models_committed(sender, changes): for obj, change in changes: if change == 'insert' and hasattr(obj, '_after_insert'): obj._after_insert() elif change == 'delete' and hasattr(obj, '_after_delete'): obj._after_delete() @before_models_committed.connect def on_before_models_committed(sender, changes): for obj, change in changes: if change == 'insert' and hasattr(obj, '_before_insert'): obj._before_insert() elif change == 'delete' and hasattr(obj, '_before_delete'): obj._before_delete()