# -*- coding: utf-8 -*-
# This file is a part of the Anitya project.
#
# Copyright © 2014-2017 Pierre-Yves Chibon <pingou@pingoured.fr>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""SQLAlchemy database models."""
try:
# The Python 3.6+ API
from secrets import choice as random_choice
except ImportError:
# Fall back to random with os.urandom
import random
random = random.SystemRandom()
random_choice = random.choice
import datetime
import logging
import time
import string
import uuid
import six
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import validates
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.types import TypeDecorator, CHAR
from anitya.config import config as anitya_config
from anitya.lib.versions import GLOBAL_DEFAULT as DEFAULT_VERSION_SCHEME
from anitya.lib.plugins import ECOSYSTEM_PLUGINS, BACKEND_PLUGINS, VERSION_PLUGINS
from .meta import Base
_log = logging.getLogger(__name__)
def _paginate_query(query, page):
''' Paginate a given query to returned the specified page (if any).
'''
if page:
try:
page = int(page)
except ValueError:
page = None
if page:
limit = 50
offset = (page - 1) * limit
query = query.offset(offset).limit(limit)
return query
[docs]class Log(Base):
''' Simple table to store/log action occuring in the database. '''
__tablename__ = 'logs'
id = sa.Column(sa.Integer, primary_key=True)
user = sa.Column(sa.String(200), index=True, nullable=False)
project = sa.Column(sa.String(200), index=True, nullable=True)
distro = sa.Column(sa.String(200), index=True, nullable=True)
description = sa.Column(sa.Text, nullable=False)
created_on = sa.Column(sa.DateTime, default=datetime.datetime.utcnow)
def __init__(self, user, project=None, distro=None, description=None):
''' Constructor.
'''
self.user = user
self.project = project
self.distro = distro
self.description = description
[docs] @classmethod
def insert(cls, session, user, project=None, distro=None,
description=None):
""" Insert the given log entry into the database.
:arg session: the session to connect to the database with
:arg user: the username of the user doing the action
:arg project: the `Project` object of the project changed
:arg distro: the `Distro` object of the distro changed
:arg description: a short textual description of the action
performed
"""
project_name = None
if project:
project_name = project.name
distro_name = None
if distro:
distro_name = distro.name
log = Log(user=user, project=project_name, distro=distro_name,
description=description)
session.add(log)
session.flush()
[docs] @classmethod
def search(cls, session, project_name=None, from_date=None, user=None,
limit=None, offset=None, count=False):
""" Return the list of the last Log entries present in the database.
:arg cls: the class object
:arg session: the database session used to query the information.
:kwarg project_name: the name of the project to restrict the logs to.
:kwarg from_date: the date from which to give the entries.
:kwarg user: the name of the user to restrict the logs to.
:kwarg limit: limit the result to X rows.
:kwarg offset: start the result at row X.
:kwarg count: a boolean to return the result of a COUNT query
if true, returns the data if false (default).
"""
query = session.query(
cls
)
if project_name:
query = query.filter(cls.project == project_name)
if from_date:
query = query.filter(cls.created_on >= from_date)
if user:
if isinstance(user, (list, tuple)):
query = query.filter(cls.user.in_(user))
else:
query = query.filter(cls.user == user)
query = query.order_by(cls.created_on.desc())
if count:
return query.count()
if offset:
query = query.offset(offset)
if limit:
query = query.limit(limit)
return query.all()
[docs]class Distro(Base):
__tablename__ = 'distros'
name = sa.Column(sa.String(200), primary_key=True)
def __init__(self, name):
''' Constructor. '''
self.name = name
def __json__(self):
return dict(name=self.name)
[docs] @classmethod
def by_name(cls, session, name):
query = session.query(
cls
).filter(
sa.func.lower(cls.name) == sa.func.lower(name)
)
return query.first()
get = by_name
[docs] @classmethod
def all(cls, session, page=None, count=False):
query = session.query(cls).order_by(cls.name)
query = _paginate_query(query, page)
if count:
return query.count()
else:
return query.all()
[docs] @classmethod
def search(cls, session, pattern, page=None, count=False):
''' Search the distribuutions by their name '''
if '*' in pattern:
pattern = pattern.replace('*', '%')
query = session.query(
cls
).filter(
sa.or_(
sa.func.lower(cls.name).like(sa.func.lower(pattern)),
)
).order_by(
cls.name
).distinct()
query = _paginate_query(query, page)
if count:
return query.count()
else:
return query.all()
[docs] @classmethod
def get_or_create(cls, session, name):
distro = cls.by_name(session, name)
if not distro:
distro = cls(
name=name
)
session.add(distro)
session.flush()
return distro
[docs]class Packages(Base):
__tablename__ = 'packages'
id = sa.Column(sa.Integer, primary_key=True)
distro = sa.Column(
sa.String(200),
sa.ForeignKey(
"distros.name",
ondelete="cascade",
onupdate="cascade"))
project_id = sa.Column(
sa.Integer,
sa.ForeignKey(
"projects.id",
ondelete="cascade",
onupdate="cascade")
)
package_name = sa.Column(sa.String(200))
__table_args__ = (
sa.UniqueConstraint('distro', 'package_name'),
)
project = sa.orm.relation('Project')
def __repr__(self):
return '<Packages(%s, %s: %s)>' % (
self.project_id, self.distro, self.package_name)
def __json__(self):
return dict(
package_name=self.package_name,
distro=self.distro,
)
[docs] @classmethod
def by_id(cls, session, pkg_id):
return session.query(cls).filter_by(id=pkg_id).first()
[docs] @classmethod
def get(cls, session, project_id, distro, package_name):
query = session.query(
cls
).filter(
cls.project_id == project_id
).filter(
sa.func.lower(cls.distro) == sa.func.lower(distro)
).filter(
cls.package_name == package_name
)
return query.first()
[docs] @classmethod
def by_package_name_distro(cls, session, package_name, distro):
query = session.query(
cls
).filter(
cls.package_name == package_name
).filter(
sa.func.lower(cls.distro) == sa.func.lower(distro)
)
return query.first()
[docs]class Project(Base):
"""
Models an upstream project and maps it to a database table.
Attributes:
id (sa.Integer): The database primary key.
name (sa.String): The upstream project's name.
homepage (sa.String): The URL for the project's home page.
backend (sa.String): The name of the backend to use when fetching updates;
this is a foreign key to a :class:`Backend`.
ecosystem_name (sa.String): The name of the ecosystem this project is a part
of. If the project isn't part of an ecosystem (e.g. PyPI), use the homepage
URL.
version_url (sa.String): The url to use when polling for new versions. This
may be ignored if this project is part of an ecosystem with a fixed
URL (e.g. Cargo projects are on https://crates.io).
regex (sa.String): A Python ``re`` style regular expression that is applied
to the HTML from ``version_url`` to find versions.
insecure (sa.Boolean): Whether or not to validate the x509 certificate
offered by the server at ``version_url``. Defaults to ``False``.
latest_version (sa.Boolean): The latest version for the project, as determined
by the version sorting algorithm.
logs (sa.Text): The result of the last update.
updated_on (sa.DateTime): When the project was last updated.
created_on (sa.DateTime): When the project was created in Anitya.
packages (list): List of :class:`Package` objects which represent the
downstream packages for this project.
version_scheme (sa.String): The version scheme to use for this project.
If this is null, a default will be used. See the :mod:`anitya.lib.versions`
documentation for more information.
"""
__tablename__ = 'projects'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.String(200), nullable=False, index=True)
homepage = sa.Column(sa.String(200), nullable=False)
backend = sa.Column(sa.String(200), default='custom')
ecosystem_name = sa.Column(sa.String(200), nullable=False, index=True)
version_url = sa.Column(sa.String(200), nullable=True)
regex = sa.Column(sa.String(200), nullable=True)
version_prefix = sa.Column(sa.String(200), nullable=True)
insecure = sa.Column(sa.Boolean, nullable=False, default=False)
version_scheme = sa.Column(sa.String(50), nullable=True)
latest_version = sa.Column(sa.String(50))
logs = sa.Column(sa.Text)
updated_on = sa.Column(sa.DateTime, server_default=sa.func.now(),
onupdate=sa.func.current_timestamp())
created_on = sa.Column(sa.DateTime, default=datetime.datetime.utcnow)
packages = sa.orm.relation('Packages')
__table_args__ = (
sa.UniqueConstraint('name', 'homepage'),
sa.UniqueConstraint('name', 'ecosystem_name',
name="UNIQ_PROJECT_NAME_PER_ECOSYSTEM"),
)
[docs] @validates('backend')
def validate_backend(self, key, value):
if value not in BACKEND_PLUGINS.get_plugin_names():
raise ValueError('Backend "{}" is not supported.'.format(value))
return value
@property
def versions(self):
''' Return list of all versions stored, sorted from newest to oldest.
'''
version_class = self.get_version_class()
versions = [
version_class(version=v_obj.version, prefix=self.version_prefix)
for v_obj in self.versions_obj
]
sorted_versions = reversed(sorted(versions))
return [v.version for v in sorted_versions]
[docs] def get_version_class(self):
"""
Get the class for the version scheme used by this project.
This will take into account the defaults set in the ecosystem, backend,
and globally. The version scheme locations are checked in the following
order and the first non-null result is returned:
1. On the project itself in the ``version_scheme`` column.
2. The project's ecosystem default, if the project is part of one.
3. The project's backend default, if the backend defines one.
4. The global default defined in :data:`anitya.lib.versions.GLOBAL_DEFAULT`
Returns:
anitya.lib.versions.Version: A ``Version`` sub-class.
"""
version_scheme = self.version_scheme
if not version_scheme and self.ecosystem_name:
ecosystem = ECOSYSTEM_PLUGINS.get_plugin(self.ecosystem_name)
if ecosystem is None:
# This project uses its URL as an ecosystem
version_scheme = DEFAULT_VERSION_SCHEME
else:
version_scheme = ecosystem.default_version_scheme
if not version_scheme and self.backend:
backend = BACKEND_PLUGINS.get_plugin(self.backend)
version_scheme = backend.default_version_scheme
if not version_scheme:
version_scheme = DEFAULT_VERSION_SCHEME
return VERSION_PLUGINS.get_plugin(version_scheme)
def __repr__(self):
return '<Project(%s, %s)>' % (self.name, self.homepage)
def __json__(self, detailed=False):
output = dict(
id=self.id,
name=self.name,
homepage=self.homepage,
regex=self.regex,
backend=self.backend,
version_url=self.version_url,
version=self.latest_version,
versions=self.versions,
created_on=time.mktime(self.created_on.timetuple()) if self.created_on else None,
updated_on=time.mktime(self.updated_on.timetuple()) if self.updated_on else None,
ecosystem=self.ecosystem_name,
)
if detailed:
output['packages'] = [pkg.__json__() for pkg in self.packages]
return output
[docs] @classmethod
def get_or_create(cls, session, name, homepage, backend='custom'):
project = cls.by_name_and_homepage(session, name, homepage)
if not project:
project = cls(name=name, homepage=homepage, backend=backend)
session.add(project)
session.flush()
return project
[docs] @classmethod
def by_name(cls, session, name):
return session.query(cls).filter_by(name=name).all()
[docs] @classmethod
def by_id(cls, session, project_id):
return session.query(cls).filter_by(id=project_id).first()
get = by_id
[docs] @classmethod
def by_homepage(cls, session, homepage):
return session.query(cls).filter_by(homepage=homepage).all()
[docs] @classmethod
def by_name_and_homepage(cls, session, name, homepage):
query = session.query(
cls
).filter(
cls.name == name
).filter(
cls.homepage == homepage
)
return query.first()
[docs] @classmethod
def by_name_and_ecosystem(cls, session, name, ecosystem):
try:
query = session.query(cls)
query = query.filter(cls.name == name, cls.ecosystem_name == ecosystem)
return query.one()
except NoResultFound:
return None
[docs] @classmethod
def all(cls, session, page=None, count=False):
query = session.query(
Project
).order_by(
sa.func.lower(Project.name)
)
query = _paginate_query(query, page)
if count:
return query.count()
else:
return query.all()
[docs] @classmethod
def by_distro(cls, session, distro, page=None, count=False):
query = session.query(
Project
).filter(
Project.id == Packages.project_id
).filter(
sa.func.lower(Packages.distro) == sa.func.lower(distro)
).order_by(
sa.func.lower(Project.name)
)
query = _paginate_query(query, page)
if count:
return query.count()
else:
return query.all()
[docs] @classmethod
def updated(
cls, session, status='updated', name=None, log=None,
page=None, count=False):
''' Method used to retrieve projects according to their logs and
how they performed at the last cron job.
:kwarg status: used to filter the projects based on how they
performed at the last cron run
:kwarg name: if present, will return the entries having the matching
name
:kwarg log: if present, will return the entries having the matching
log
:kwarg page: The page number of returned, pages contain 50 entries
:kwarg count: A boolean used to return either the list of entries
matching the criterias or just the COUNT of entries
'''
query = session.query(
Project
).order_by(
sa.func.lower(Project.name)
)
if status == 'updated':
query = query.filter(
Project.logs.isnot(None),
Project.logs == 'Version retrieved correctly',
)
elif status == 'failed':
query = query.filter(
Project.logs.isnot(None),
Project.logs != 'Version retrieved correctly',
~Project.logs.ilike('Something strange occured%'),
)
elif status == 'odd':
query = query.filter(
Project.logs.isnot(None),
Project.logs != 'Version retrieved correctly',
Project.logs.ilike('Something strange occured%'),
)
elif status == 'new':
query = query.filter(
Project.logs.is_(None),
)
elif status == 'never_updated':
query = query.filter(
Project.logs.isnot(None),
Project.logs != 'Version retrieved correctly',
Project.latest_version.is_(None),
)
if name:
if '*' in name:
name = name.replace('*', '%')
else:
name = '%' + name + '%'
query = query.filter(
Project.name.ilike(name),
)
if log:
if '*' in log:
log = log.replace('*', '%')
else:
log = '%' + log + '%'
query = query.filter(
Project.logs.ilike(log),
)
query = _paginate_query(query, page)
if count:
return query.count()
else:
return query.all()
[docs] @classmethod
def search(cls, session, pattern, distro=None, page=None, count=False):
''' Search the projects by their name or package name '''
query1 = session.query(
cls
)
if pattern:
pattern = pattern.replace('_', '\_')
if '*' in pattern:
pattern = pattern.replace('*', '%')
if '%' in pattern:
query1 = query1.filter(
Project.name.ilike(pattern)
)
else:
query1 = query1.filter(
Project.name == pattern
)
query2 = session.query(
cls
).filter(
Project.id == Packages.project_id
)
if pattern:
if '%' in pattern:
query2 = query2.filter(
Packages.package_name.ilike(pattern)
)
else:
query2 = query2.filter(
Packages.package_name == pattern
)
if distro is not None:
query1 = query1.filter(
Project.id == Packages.project_id
).filter(
sa.func.lower(Packages.distro) == sa.func.lower(distro)
)
query2 = query2.filter(
sa.func.lower(Packages.distro) == sa.func.lower(distro)
)
query = query1.distinct().union(
query2.distinct()
).order_by(
cls.name
)
query = _paginate_query(query, page)
if count:
return query.count()
else:
return query.all()
[docs]class ProjectVersion(Base):
__tablename__ = 'projects_versions'
project_id = sa.Column(
sa.Integer,
sa.ForeignKey(
"projects.id",
ondelete="cascade",
onupdate="cascade"),
primary_key=True,
)
version = sa.Column(sa.String(50), primary_key=True)
project = sa.orm.relation('Project', backref='versions_obj')
[docs]class ProjectFlag(Base):
__tablename__ = 'projects_flags'
id = sa.Column(sa.Integer, primary_key=True)
project_id = sa.Column(
sa.Integer,
sa.ForeignKey(
"projects.id",
ondelete="cascade",
onupdate="cascade")
)
reason = sa.Column(sa.Text, nullable=False)
user = sa.Column(sa.String(200), index=True, nullable=False)
state = sa.Column(sa.String(50), default='open', nullable=False)
created_on = sa.Column(sa.DateTime, default=datetime.datetime.utcnow)
updated_on = sa.Column(sa.DateTime, server_default=sa.func.now(),
onupdate=sa.func.current_timestamp())
project = sa.orm.relation('Project', backref='flags')
def __repr__(self):
return '<ProjectFlag(%s, %s, %s)>' % (self.project.name, self.user,
self.state)
def __json__(self, detailed=False):
output = dict(
id=self.id,
project=self.project.name,
user=self.user,
state=self.state,
created_on=time.mktime(self.created_on.timetuple()),
updated_on=time.mktime(self.updated_on.timetuple()),
)
if detailed:
output['reason'] = self.reason
return output
[docs] @classmethod
def all(cls, session, page=None, count=False):
query = session.query(
ProjectFlag
).order_by(ProjectFlag.created_on)
return query.all()
[docs] @classmethod
def search(cls, session, project_name=None, from_date=None, user=None,
state=None, limit=None, offset=None, count=False):
""" Return the list of the last Flag entries present in the database.
:arg cls: the class object
:arg session: the database session used to query the information.
:kwarg project_name: the name of the project to restrict the flags to.
:kwarg from_date: the date from which to give the entries.
:kwarg user: the name of the user to restrict the flags to.
:kwarg state: the flag's status (open or closed).
:kwarg limit: limit the result to X rows.
:kwarg offset: start the result at row X.
:kwarg count: a boolean to return the result of a COUNT query
if true, returns the data if false (default).
"""
query = session.query(
cls
)
if project_name:
query = query.filter(
cls.project_id == Project.id
) .filter(
Project.name == project_name
)
if from_date:
query = query.filter(cls.created_on >= from_date)
if user:
query = query.filter(cls.user == user)
if state:
query = query.filter(cls.state == state)
query = query.order_by(cls.created_on.desc())
if count:
return query.count()
if offset:
query = query.offset(offset)
if limit:
query = query.limit(limit)
return query.all()
[docs] @classmethod
def get(cls, session, flag_id):
query = session.query(
cls
).filter(
cls.id == flag_id)
return query.first()
[docs]class Run(Base):
__tablename__ = 'runs'
status = sa.Column(sa.String(20), primary_key=True)
created_on = sa.Column(
sa.DateTime, default=datetime.datetime.utcnow, primary_key=True)
[docs] @classmethod
def last_entry(cls, session):
''' Return the last log about the cron run. '''
query = session.query(
cls
).order_by(
cls.created_on.desc()
)
return query.first()
[docs]class GUID(TypeDecorator):
"""
Platform-independent GUID type.
If PostgreSQL is being used, use its native UUID type, otherwise use a CHAR(32) type.
"""
impl = CHAR
[docs] def load_dialect_impl(self, dialect):
"""
PostgreSQL has a native UUID type, so use it if we're using PostgreSQL.
Args:
dialect (sqlalchemy.engine.interfaces.Dialect): The dialect in use.
Returns:
sqlalchemy.types.TypeEngine: Either a PostgreSQL UUID or a CHAR(32) on other
dialects.
"""
if dialect.name == 'postgresql':
return dialect.type_descriptor(UUID())
else:
return dialect.type_descriptor(CHAR(32))
[docs] def process_bind_param(self, value, dialect):
"""
Process the value being bound.
If PostgreSQL is in use, just use the string representation of the UUID.
Otherwise, use the integer as a hex-encoded string.
Args:
value (object): The value that's being bound to the object.
dialect (sqlalchemy.engine.interfaces.Dialect): The dialect in use.
Returns:
str: The value of the UUID as a string.
"""
if value is None:
return value
elif dialect.name == 'postgresql':
return str(value)
else:
if not isinstance(value, uuid.UUID):
return "%.32x" % uuid.UUID(value).int
else:
# hexstring
return "%.32x" % value.int
[docs] def process_result_value(self, value, dialect):
"""
Casts the UUID value to the native Python type.
Args:
value (object): The database value.
dialect (sqlalchemy.engine.interfaces.Dialect): The dialect in use.
Returns:
uuid.UUID: The value as a Python :class:`uuid.UUID`.
"""
if value is None:
return value
else:
return uuid.UUID(value)
[docs]class User(Base):
"""
A table for Anitya users.
This table is intended to work with a table of third-party authentication
providers. Anitya does not support local users.
Attributes:
id (uuid.UUID): The primary key for the table.
email (str): The user's email.
username (str): The user's username, as retrieved from third-party authentication.
active (bool): Indicates whether the user is active. If false, users will not be
able to log in.
social_auth (sqlalchemy.orm.dynamic.AppenderQuery): The list of
:class:`social_flask_sqlalchemy.models.UserSocialAuth` entries for this user.
"""
__tablename__ = 'users'
id = sa.Column(GUID, primary_key=True, default=uuid.uuid4)
# SMTP says 256 is the maximum length of a path:
# https://tools.ietf.org/html/rfc5321#section-4.5.3
email = sa.Column(sa.String(256), nullable=False, index=True, unique=True)
username = sa.Column(sa.String(256), nullable=False, index=True, unique=True)
active = sa.Column(sa.Boolean, default=True)
@property
def admin(self):
"""
Determine if this user is an administrator.
Returns:
bool: True if the user is an administrator.
"""
return six.text_type(self.id) in anitya_config.get('ANITYA_WEB_ADMINS', [])
@property
def is_active(self):
"""
Implement the flask-login interface for determining if the user is active.
If a user is _not_ active, they are not allowed to log in.
Returns:
bool: True if the user is active.
"""
return self.active
@property
def is_anonymous(self):
"""
Implement the flask-login interface for determining if the user is authenticated.
flask-login uses an "anonymous user" object if there is no authenticated user. This
indicates to flask-login this user is not an anonymous user.
Returns:
bool: False in all cases.
"""
return False
@property
def is_authenticated(self):
"""
Implement the flask-login interface for determining if the user is authenticated.
In this case, if flask-login has an instance of :class:`User`, then that user has
already authenticated via a third-party authentication mechanism.
Returns:
bool: True in all cases.
"""
return True
[docs] def get_id(self):
"""
Implement the flask-login interface for retrieving the user's ID.
Returns:
six.text_type: The Unicode string that uniquely identifies a user.
"""
return six.text_type(self.id)
def _api_token_generator(charset=string.ascii_letters + string.digits, length=40):
"""
Generate an API token of a given length using the provided character set.
Args:
charset (str): A string of characters to choose from in the token.
length (int): The number of characters to use in the token.
Returns:
str: The API token as a unicode string.
"""
return u''.join(random_choice(charset) for __ in range(length))
[docs]class ApiToken(Base):
"""
A table for user API tokens.
Attributes:
token (sa.String): A 40 character string that represents the API token.
This is the primary key and is, by default, generated automatically.
created (sa.DateTime): The time this API token was created.
description (sa.Text): A user-provided description of what the API token is for.
user (User): The user this API token is associated with.
"""
__tablename__ = 'tokens'
token = sa.Column(sa.String(40), default=_api_token_generator, primary_key=True)
created = sa.Column(sa.DateTime, default=datetime.datetime.utcnow, nullable=False)
user_id = sa.Column(GUID, sa.ForeignKey('users.id'), nullable=False)
user = sa.orm.relationship(
'User',
lazy='joined',
backref=sa.orm.backref('api_tokens', cascade='all, delete-orphan'),
)
description = sa.Column(sa.Text, nullable=True)