Source code for sqlobject.sqlbuilder

"""
sqlobject.sqlbuilder
--------------------

:author: Ian Bicking <ianb@colorstudy.com>

Builds SQL expressions from normal Python expressions.

Disclaimer
----------

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as
published by the Free Software Foundation; either version 2.1 of the
License, or (at your option any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU Lesser General Public
License along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301,
USA.

Instructions
------------

To begin a SQL expression, you must use some sort of SQL object -- a
field, table, or SQL statement (``SELECT``, ``INSERT``, etc.)  You can
then use normal operators, with the exception of: `and`, `or`, `not`,
and `in`.  You can use the `AND`, `OR`, `NOT`, and `IN` functions
instead, or you can also use `&`, `|`, and `~` for `and`, `or`, and
`not` respectively (however -- the precidence for these operators
doesn't work as you would want, so you must use many parenthesis).

To create a sql field, table, or constant/function, use the namespaces
`table`, `const`, and `func`.  For instance, ``table.address`` refers
to the ``address`` table, and ``table.address.state`` refers to the
``state`` field in the address table.  ``const.NULL`` is the ``NULL``
SQL constant, and ``func.NOW()`` is the ``NOW()`` function call
(`const` and `func` are actually identicle, but the two names are
provided for clarity).  Once you create this object, expressions
formed with it will produce SQL statements.

The ``sqlrepr(obj)`` function gets the SQL representation of these
objects, as well as the proper SQL representation of basic Python
types (None==NULL).

There are a number of DB-specific SQL features that this does not
implement.  There are a bunch of normal ANSI features also not present.

See the bottom of this module for some examples, and run it (i.e.
``python sql.py``) to see the results of those examples.

"""

########################################
# Constants
########################################

import fnmatch
import operator
import re
import threading
import types
import weakref

from . import classregistry
from .converters import registerConverter, sqlrepr, quote_str, unquote_str
from .compat import PY2, string_type


[docs]class VersionError(Exception): pass
[docs]class NoDefault: pass
[docs]class SQLObjectState(object): def __init__(self, soObject, connection=None): self.soObject = weakref.proxy(soObject) self.connection = connection
safeSQLRE = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_\.]*$')
[docs]def sqlIdentifier(obj): # some db drivers return unicode column names return isinstance(obj, string_type) and bool(safeSQLRE.search(obj.strip()))
[docs]def execute(expr, executor): if hasattr(expr, 'execute'): return expr.execute(executor) else: return expr
def _str_or_sqlrepr(expr, db): if isinstance(expr, string_type): return expr return sqlrepr(expr, db) ######################################## # Expression generation ########################################
[docs]class SQLExpression: def __add__(self, other): return SQLOp("+", self, other) def __radd__(self, other): return SQLOp("+", other, self) def __sub__(self, other): return SQLOp("-", self, other) def __rsub__(self, other): return SQLOp("-", other, self) def __mul__(self, other): return SQLOp("*", self, other) def __rmul__(self, other): return SQLOp("*", other, self) def __div__(self, other): return SQLOp("/", self, other) def __rdiv__(self, other): return SQLOp("/", other, self) def __truediv__(self, other): return SQLOp("/", self, other) def __rtruediv__(self, other): return SQLOp("/", other, self) def __floordiv__(self, other): return SQLConstant("FLOOR")(SQLOp("/", self, other)) def __rfloordiv__(self, other): return SQLConstant("FLOOR")(SQLOp("/", other, self)) def __pos__(self): return SQLPrefix("+", self) def __neg__(self): return SQLPrefix("-", self) def __pow__(self, other): return SQLConstant("POW")(self, other) def __rpow__(self, other): return SQLConstant("POW")(other, self) def __abs__(self): return SQLConstant("ABS")(self) def __mod__(self, other): return SQLModulo(self, other) def __rmod__(self, other): return SQLConstant("MOD")(other, self) def __lt__(self, other): return SQLOp("<", self, other) def __le__(self, other): return SQLOp("<=", self, other) def __gt__(self, other): return SQLOp(">", self, other) def __ge__(self, other): return SQLOp(">=", self, other) def __eq__(self, other): if other is None: return ISNULL(self) else: return SQLOp("=", self, other) def __ne__(self, other): if other is None: return ISNOTNULL(self) else: return SQLOp("<>", self, other) def __and__(self, other): return SQLOp("AND", self, other) def __rand__(self, other): return SQLOp("AND", other, self) def __or__(self, other): return SQLOp("OR", self, other) def __ror__(self, other): return SQLOp("OR", other, self) def __invert__(self): return SQLPrefix("NOT", self) def __call__(self, *args): return SQLCall(self, args) def __repr__(self): try: return self.__sqlrepr__(None) except AssertionError: return '<%s %s>' % ( self.__class__.__name__, hex(id(self))[2:]) def __str__(self): return repr(self) def __cmp__(self, other): raise VersionError("Python 2.1+ required") def __rcmp__(self, other): raise VersionError("Python 2.1+ required")
[docs] def startswith(self, s): return STARTSWITH(self, s)
[docs] def endswith(self, s): return ENDSWITH(self, s)
[docs] def contains(self, s): return CONTAINSSTRING(self, s)
[docs] def components(self): return []
[docs] def tablesUsed(self, db): return self.tablesUsedSet(db)
[docs] def tablesUsedSet(self, db): tables = set() for table in self.tablesUsedImmediate(): if hasattr(table, '__sqlrepr__'): table = sqlrepr(table, db) tables.add(table) for component in self.components(): tables.update(tablesUsedSet(component, db)) return tables
[docs] def tablesUsedImmediate(self): return []
####################################### # Converter for SQLExpression instances #######################################
[docs]def SQLExprConverter(value, db): return value.__sqlrepr__()
registerConverter(SQLExpression, SQLExprConverter)
[docs]def tablesUsedSet(obj, db): if hasattr(obj, "tablesUsedSet"): return obj.tablesUsedSet(db) else: return {}
if PY2: div = operator.div else: div = operator.truediv operatorMap = { "+": operator.add, "/": div, "-": operator.sub, "*": operator.mul, "<": operator.lt, "<=": operator.le, "=": operator.eq, "!=": operator.ne, ">=": operator.ge, ">": operator.gt, "IN": operator.contains, "IS": operator.eq, }
[docs]class SQLOp(SQLExpression): def __init__(self, op, expr1, expr2): self.op = op.upper() if isinstance(expr1, Subquery): expr1, expr2 = expr2, expr1 self.expr1 = expr1 self.expr2 = expr2 def __sqlrepr__(self, db): s1 = sqlrepr(self.expr1, db) s2 = sqlrepr(self.expr2, db) if s1[0] != '(' and s1 != 'NULL': s1 = '(' + s1 + ')' if s2[0] != '(' and s2 != 'NULL' and \ not isinstance(self.expr2, Subquery): s2 = '(' + s2 + ')' return "(%s %s %s)" % (s1, self.op, s2)
[docs] def components(self): return [self.expr1, self.expr2]
[docs] def execute(self, executor): if self.op == "AND": return execute(self.expr1, executor) \ and execute(self.expr2, executor) elif self.op == "OR": return execute(self.expr1, executor) \ or execute(self.expr2, executor) else: return operatorMap[self.op.upper()](execute(self.expr1, executor), execute(self.expr2, executor))
registerConverter(SQLOp, SQLExprConverter)
[docs]class SQLModulo(SQLOp): def __init__(self, expr1, expr2): SQLOp.__init__(self, '%', expr1, expr2) def __sqlrepr__(self, db): if db == 'sqlite': return SQLOp.__sqlrepr__(self, db) s1 = sqlrepr(self.expr1, db) s2 = sqlrepr(self.expr2, db) return "MOD(%s, %s)" % (s1, s2)
registerConverter(SQLModulo, SQLExprConverter)
[docs]class SQLCall(SQLExpression): def __init__(self, expr, args): self.expr = expr self.args = args def __sqlrepr__(self, db): return "%s%s" % (sqlrepr(self.expr, db), sqlrepr(self.args, db))
[docs] def components(self): return [self.expr] + list(self.args)
[docs] def execute(self, executor): raise ValueError("I don't yet know how to locally execute functions")
registerConverter(SQLCall, SQLExprConverter)
[docs]class SQLPrefix(SQLExpression): def __init__(self, prefix, expr): self.prefix = prefix self.expr = expr def __sqlrepr__(self, db): return "%s %s" % (self.prefix, sqlrepr(self.expr, db))
[docs] def components(self): return [self.expr]
[docs] def execute(self, executor): prefix = self.prefix expr = execute(self.expr, executor) if prefix == "+": return expr elif prefix == "-": return -expr elif prefix.upper() == "NOT": return not expr
registerConverter(SQLPrefix, SQLExprConverter)
[docs]class SQLConstant(SQLExpression): def __init__(self, const): self.const = const def __sqlrepr__(self, db): return self.const
[docs] def execute(self, executor): raise ValueError("I don't yet know how to execute SQL constants")
registerConverter(SQLConstant, SQLExprConverter)
[docs]class SQLTrueClauseClass(SQLExpression): def __sqlrepr__(self, db): return "1 = 1"
[docs] def execute(self, executor): return 1
SQLTrueClause = SQLTrueClauseClass() registerConverter(SQLTrueClauseClass, SQLExprConverter) ######################################## # Namespaces ########################################
[docs]class Field(SQLExpression): def __init__(self, tableName, fieldName): self.tableName = tableName self.fieldName = fieldName def __sqlrepr__(self, db): return self.tableName + "." + self.fieldName
[docs] def tablesUsedImmediate(self): return [self.tableName]
[docs] def execute(self, executor): return executor.field(self.tableName, self.fieldName)
[docs]class SQLObjectField(Field): def __init__(self, tableName, fieldName, original, soClass, column): Field.__init__(self, tableName, fieldName) self.original = original self.soClass = soClass self.column = column def _from_python(self, value): column = self.column if not isinstance(value, SQLExpression) and \ column and column.from_python: value = column.from_python(value, SQLObjectState(self.soClass)) return value def __eq__(self, other): if other is None: return ISNULL(self) other = self._from_python(other) return SQLOp('=', self, other) def __ne__(self, other): if other is None: return ISNOTNULL(self) other = self._from_python(other) return SQLOp('<>', self, other)
[docs] def startswith(self, s): s = self._from_python(s) return STARTSWITH(self, s)
[docs] def endswith(self, s): s = self._from_python(s) return ENDSWITH(self, s)
[docs] def contains(self, s): s = self._from_python(s) return CONTAINSSTRING(self, s)
registerConverter(SQLObjectField, SQLExprConverter)
[docs]class Table(SQLExpression): FieldClass = Field def __init__(self, tableName): self.tableName = tableName def __getattr__(self, attr): if attr.startswith('__'): raise AttributeError("Attribute '%s' is forbidden in '%s'" % ( attr, self.__class__.__name__)) return self.FieldClass(self.tableName, attr) def __sqlrepr__(self, db): return _str_or_sqlrepr(self.tableName, db)
[docs] def execute(self, executor): raise ValueError("Tables don't have values")
[docs]class SQLObjectTable(Table): FieldClass = SQLObjectField def __init__(self, soClass): self.soClass = soClass assert soClass.sqlmeta.table, ( "Bad table name in class %r: %r" % (soClass, soClass.sqlmeta.table)) Table.__init__(self, soClass.sqlmeta.table) def __getattr__(self, attr): if attr.startswith('__'): raise AttributeError("Attribute '%s' is forbidden in '%s'" % ( attr, self.__class__.__name__)) if attr == 'id': return self._getattrFromID(attr) elif attr in self.soClass.sqlmeta.columns: column = self.soClass.sqlmeta.columns[attr] return self._getattrFromColumn(column, attr) elif attr + 'ID' in \ [k for (k, v) in self.soClass.sqlmeta.columns.items() if v.foreignKey]: attr += 'ID' column = self.soClass.sqlmeta.columns[attr] return self._getattrFromColumn(column, attr) else: raise AttributeError( "%s instance has no attribute '%s'" % (self.soClass.__name__, attr)) def _getattrFromID(self, attr): return self.FieldClass(self.tableName, self.soClass.sqlmeta.idName, attr, self.soClass, None) def _getattrFromColumn(self, column, attr): return self.FieldClass(self.tableName, column.dbName, attr, self.soClass, column)
[docs]class SQLObjectTableWithJoins(SQLObjectTable): def __getattr__(self, attr): if attr + 'ID' in \ [k for (k, v) in self.soClass.sqlmeta.columns.items() if v.foreignKey]: column = self.soClass.sqlmeta.columns[attr + 'ID'] return self._getattrFromForeignKey(column, attr) elif attr in [x.joinMethodName for x in self.soClass.sqlmeta.joins]: join = [x for x in self.soClass.sqlmeta.joins if x.joinMethodName == attr][0] return self._getattrFromJoin(join, attr) else: return SQLObjectTable.__getattr__(self, attr) def _getattrFromForeignKey(self, column, attr): ret = getattr(self, column.name) == \ getattr(self.soClass, '_SO_class_' + column.foreignKey).q.id return ret def _getattrFromJoin(self, join, attr): if hasattr(join, 'otherColumn'): return AND( join.otherClass.q.id == Field(join.intermediateTable, join.otherColumn), Field(join.intermediateTable, join.joinColumn) == self.soClass.q.id) else: return getattr(join.otherClass.q, join.joinColumn) == \ self.soClass.q.id
[docs]class TableSpace: TableClass = Table def __getattr__(self, attr): if attr.startswith('__'): raise AttributeError("Attribute '%s' is forbidden in '%s'" % ( attr, self.__class__.__name__)) return self.TableClass(attr)
[docs]class ConstantSpace: def __getattr__(self, attr): if attr.startswith('__'): raise AttributeError("Attribute '%s' is forbidden in '%s'" % ( attr, self.__class__.__name__)) return SQLConstant(attr)
######################################## # Table aliases ########################################
[docs]class AliasField(Field): def __init__(self, tableName, fieldName, alias, aliasTable): Field.__init__(self, tableName, fieldName) self.alias = alias self.aliasTable = aliasTable def __sqlrepr__(self, db): fieldName = self.fieldName if isinstance(fieldName, SQLExpression): fieldName = sqlrepr(fieldName, db) return self.alias + "." + fieldName
[docs] def tablesUsedImmediate(self): return [self.aliasTable]
[docs]class AliasTable(Table): as_string = '' # set it to "AS" if your database requires it FieldClass = AliasField _alias_lock = threading.Lock() _alias_counter = 0 def __init__(self, table, alias=None): if hasattr(table, "sqlmeta"): tableName = SQLConstant(table.sqlmeta.table) elif isinstance(table, (Select, Union)): assert alias is not None, \ "Alias name cannot be constructed from Select instances, " \ "please provide an 'alias' keyword." tableName = Subquery('', table) table = None else: tableName = SQLConstant(table) table = None Table.__init__(self, tableName) self.table = table if alias is None: self._alias_lock.acquire() try: AliasTable._alias_counter += 1 alias = "%s_alias%d" % (tableName, AliasTable._alias_counter) finally: self._alias_lock.release() self.alias = alias def __getattr__(self, attr): if attr.startswith('__'): raise AttributeError("Attribute '%s' is forbidden in '%s'" % ( attr, self.__class__.__name__)) if self.table: attr = getattr(self.table.q, attr).fieldName return self.FieldClass(self.tableName, attr, self.alias, self) def __sqlrepr__(self, db): return "%s %s %s" % (sqlrepr(self.tableName, db), self.as_string, self.alias)
[docs]class AliasSQLMeta(): def __init__(self, table, alias): self.__table = table self.__alias = alias def __getattr__(self, attr): if attr.startswith('__'): raise AttributeError("Attribute '%s' is forbidden in '%s'" % ( attr, self.__class__.__name__)) table = self.__table if (attr == "table"): return '%s %s' % (table.sqlmeta.table, self.__alias) return getattr(table.sqlmeta, attr)
[docs]class Alias(SQLExpression): def __init__(self, table, alias=None): self.q = AliasTable(table, alias) def __getattr__(self, attr): table = self.q.table if (attr == "sqlmeta") and hasattr(table, "sqlmeta"): alias = self.q.alias return AliasSQLMeta(table, alias) return getattr(table, attr) def __sqlrepr__(self, db): return sqlrepr(self.q, db)
[docs] def components(self): return [self.q]
[docs] def select(self, clause=None, clauseTables=None, orderBy=NoDefault, limit=None, lazyColumns=False, reversed=False, distinct=False, connection=None, join=None, forUpdate=False): # Import here to avoid circular import from .sresults import SelectResults return SelectResults(self, clause, clauseTables=clauseTables, orderBy=orderBy, limit=limit, lazyColumns=lazyColumns, reversed=reversed, distinct=distinct, connection=connection, join=join, forUpdate=forUpdate)
[docs]class Union(SQLExpression): def __init__(self, *tables): tabs = [] for t in tables: if not isinstance(t, SQLExpression) and hasattr(t, 'sqlmeta'): t = t.sqlmeta.table if isinstance(t, Alias): t = t.q if isinstance(t, Table): t = t.tableName if not isinstance(t, SQLExpression): t = SQLConstant(t) tabs.append(t) self.tables = tabs def __sqlrepr__(self, db): return " UNION ".join([str(sqlrepr(t, db)) for t in self.tables])
######################################## # SQL Statements ########################################
[docs]class Select(SQLExpression): def __init__(self, items=NoDefault, where=NoDefault, groupBy=NoDefault, having=NoDefault, orderBy=NoDefault, limit=NoDefault, join=NoDefault, lazyColumns=False, distinct=False, start=0, end=None, reversed=False, forUpdate=False, clause=NoDefault, staticTables=NoDefault, distinctOn=NoDefault): self.ops = {} if not isinstance(items, (list, tuple, types.GeneratorType)): items = [items] if clause is NoDefault and where is not NoDefault: clause = where if staticTables is NoDefault: staticTables = [] self.ops['items'] = items self.ops['clause'] = clause self.ops['groupBy'] = groupBy self.ops['having'] = having self.ops['orderBy'] = orderBy self.ops['limit'] = limit self.ops['join'] = join self.ops['lazyColumns'] = lazyColumns self.ops['distinct'] = distinct self.ops['distinctOn'] = distinctOn self.ops['start'] = start self.ops['end'] = end self.ops['reversed'] = reversed self.ops['forUpdate'] = forUpdate self.ops['staticTables'] = staticTables
[docs] def clone(self, **newOps): ops = self.ops.copy() ops.update(newOps) return self.__class__(**ops)
[docs] def newItems(self, items): return self.clone(items=items)
[docs] def newClause(self, new_clause): return self.clone(clause=new_clause)
[docs] def orderBy(self, orderBy): return self.clone(orderBy=orderBy)
[docs] def unlimited(self): return self.clone(limit=NoDefault, start=0, end=None)
[docs] def limit(self, limit): self.clone(limit=limit)
[docs] def lazyColumns(self, value): return self.clone(lazyColumns=value)
[docs] def reversed(self): return self.clone(reversed=not self.ops.get('reversed', False))
[docs] def distinct(self): return self.clone(distinct=True)
[docs] def filter(self, filter_clause): if filter_clause is None: # None doesn't filter anything, it's just a no-op: return self clause = self.ops['clause'] if isinstance(clause, string_type): clause = SQLConstant('(%s)' % clause) return self.newClause(AND(clause, filter_clause))
def __sqlrepr__(self, db): select = "SELECT" if self.ops['distinct']: select += " DISTINCT" if self.ops['distinctOn'] is not NoDefault: select += " ON(%s)" % _str_or_sqlrepr( self.ops['distinctOn'], db) if not self.ops['lazyColumns']: select += " %s" % ", ".join( [str(_str_or_sqlrepr(v, db)) for v in self.ops['items']]) else: select += " %s" % _str_or_sqlrepr(self.ops['items'][0], db) join = [] join_str = '' if self.ops['join'] is not NoDefault and self.ops['join'] is not None: _join = self.ops['join'] if isinstance(_join, str): join_str = " " + _join elif isinstance(_join, SQLJoin): join.append(_join) else: join.extend(_join) tables = set() for x in self.ops['staticTables']: if isinstance(x, SQLExpression): x = sqlrepr(x, db) tables.add(x) things = list(self.ops['items']) + join if self.ops['clause'] is not NoDefault: things.append(self.ops['clause']) for thing in things: if isinstance(thing, SQLExpression): tables.update(tablesUsedSet(thing, db)) for j in join: t1 = _str_or_sqlrepr(j.table1, db) if t1 in tables: tables.remove(t1) t2 = _str_or_sqlrepr(j.table2, db) if t2 in tables: tables.remove(t2) if tables: select += " FROM %s" % ", ".join(sorted(tables)) elif join: select += " FROM" tablesYet = tables for j in join: if tablesYet and j.table1: sep = ", " else: sep = " " select += sep + sqlrepr(j, db) tablesYet = True if join_str: select += join_str if self.ops['clause'] is not NoDefault: select += " WHERE %s" % _str_or_sqlrepr(self.ops['clause'], db) if self.ops['groupBy'] is not NoDefault: groupBy = _str_or_sqlrepr(self.ops['groupBy'], db) if isinstance(self.ops['groupBy'], (list, tuple)): groupBy = groupBy[1:-1] # Remove parens select += " GROUP BY %s" % groupBy if self.ops['having'] is not NoDefault: select += " HAVING %s" % _str_or_sqlrepr(self.ops['having'], db) if self.ops['orderBy'] is not NoDefault and \ self.ops['orderBy'] is not None: orderBy = self.ops['orderBy'] if self.ops['reversed']: reverser = DESC else: def reverser(x): return x if isinstance(orderBy, (list, tuple)): select += " ORDER BY %s" % ", ".join( [_str_or_sqlrepr(reverser(_x), db) for _x in orderBy]) else: select += " ORDER BY %s" % _str_or_sqlrepr( reverser(orderBy), db) start, end = self.ops['start'], self.ops['end'] if self.ops['limit'] is not NoDefault: end = start + self.ops['limit'] if start or end: from .dbconnection import dbConnectionForScheme select = dbConnectionForScheme(db)._queryAddLimitOffset(select, start, end) if self.ops['forUpdate']: select += " FOR UPDATE" return select
registerConverter(Select, SQLExprConverter)
[docs]class Insert(SQLExpression): def __init__(self, table, valueList=None, values=None, template=NoDefault): self.template = template self.table = table if valueList: if values: raise TypeError("You may only give valueList *or* values") self.valueList = valueList else: self.valueList = [values] def __sqlrepr__(self, db): if not self.valueList: return '' insert = "INSERT INTO %s" % self.table allowNonDict = True template = self.template if (template is NoDefault) and isinstance(self.valueList[0], dict): template = list(sorted(self.valueList[0].keys())) allowNonDict = False if template is not NoDefault: insert += " (%s)" % ", ".join(template) insert += " VALUES " listToJoin = [] listToJoin_app = listToJoin.append for value in self.valueList: if isinstance(value, dict): if template is NoDefault: raise TypeError( "You can't mix non-dictionaries with dictionaries " "in an INSERT if you don't provide a template (%s)" % repr(value)) value = dictToList(template, value) elif not allowNonDict: raise TypeError( "You can't mix non-dictionaries with dictionaries " "in an INSERT if you don't provide a template (%s)" % repr(value)) listToJoin_app("(%s)" % ", ".join([sqlrepr(v, db) for v in value])) insert = "%s%s" % (insert, ", ".join(listToJoin)) return insert
registerConverter(Insert, SQLExprConverter)
[docs]def dictToList(template, dict): list = [] for key in template: list.append(dict[key]) if len(dict.keys()) > len(template): raise TypeError( "Extra entries in dictionary that aren't asked for in template " "(template=%s, dict=%s)" % (repr(template), repr(dict))) return list
[docs]class Update(SQLExpression): def __init__(self, table, values, template=NoDefault, where=NoDefault): self.table = table self.values = values self.template = template self.whereClause = where def __sqlrepr__(self, db): update = "%s %s" % (self.sqlName(), self.table) update += " SET" first = True if self.template is not NoDefault: for i in range(len(self.template)): if first: first = False else: update += "," update += " %s=%s" % (self.template[i], sqlrepr(self.values[i], db)) else: for key, value in sorted(self.values.items()): if first: first = False else: update += "," update += " %s=%s" % (key, sqlrepr(value, db)) if self.whereClause is not NoDefault: update += " WHERE %s" % _str_or_sqlrepr(self.whereClause, db) return update
[docs] def sqlName(self): return "UPDATE"
registerConverter(Update, SQLExprConverter)
[docs]class Delete(SQLExpression): """To be safe, this will signal an error if there is no where clause, unless you pass in where=None to the constructor.""" def __init__(self, table, where=NoDefault): self.table = table if where is NoDefault: raise TypeError( "You must give a where clause or pass in None " "to indicate no where clause") self.whereClause = where def __sqlrepr__(self, db): whereClause = self.whereClause if whereClause is None: return "DELETE FROM %s" % self.table whereClause = _str_or_sqlrepr(whereClause, db) return "DELETE FROM %s WHERE %s" % (self.table, whereClause)
registerConverter(Delete, SQLExprConverter)
[docs]class Replace(Update):
[docs] def sqlName(self): return "REPLACE"
registerConverter(Replace, SQLExprConverter) ######################################## # SQL Builtins ########################################
[docs]class DESC(SQLExpression): def __init__(self, expr): self.expr = expr def __sqlrepr__(self, db): if isinstance(self.expr, DESC): return sqlrepr(self.expr.expr, db) return '%s DESC' % sqlrepr(self.expr, db)
[docs]def AND(*ops): if not ops: return None op1 = ops[0] ops = ops[1:] if ops: return SQLOp("AND", op1, AND(*ops)) else: return op1
[docs]def OR(*ops): if not ops: return None op1 = ops[0] ops = ops[1:] if ops: return SQLOp("OR", op1, OR(*ops)) else: return op1
[docs]def NOT(op): return SQLPrefix("NOT", op)
def _IN(item, list): return SQLOp("IN", item, list)
[docs]def IN(item, list): from .sresults import SelectResults # Import here to avoid circular import if isinstance(list, SelectResults): query = list.queryForSelect() query.ops['items'] = [list.sourceClass.q.id] list = query if isinstance(list, Select): return INSubquery(item, list) else: return _IN(item, list)
[docs]def NOTIN(item, list): if isinstance(list, Select): return NOTINSubquery(item, list) else: return NOT(_IN(item, list))
[docs]def STARTSWITH(expr, pattern): return LIKE(expr, _LikeQuoted(pattern) + '%', escape='\\')
[docs]def ENDSWITH(expr, pattern): return LIKE(expr, '%' + _LikeQuoted(pattern), escape='\\')
[docs]def CONTAINSSTRING(expr, pattern): return LIKE(expr, '%' + _LikeQuoted(pattern) + '%', escape='\\')
[docs]def ISNULL(expr): return SQLOp("IS", expr, None)
[docs]def ISNOTNULL(expr): return SQLOp("IS NOT", expr, None)
[docs]class ColumnAS(SQLOp): ''' Just like SQLOp('AS', expr, name) except without the parentheses ''' def __init__(self, expr, name): if isinstance(name, string_type): name = SQLConstant(name) SQLOp.__init__(self, 'AS', expr, name) def __sqlrepr__(self, db): return "%s %s %s" % (sqlrepr(self.expr1, db), self.op, sqlrepr(self.expr2, db))
class _LikeQuoted: # It assumes prefix and postfix are strings; usually just a percent sign. # @@: I'm not sure what the quoting rules really are for all the # databases def __init__(self, expr): self.expr = expr self.prefix = '' self.postfix = '' def __radd__(self, s): self.prefix = s + self.prefix return self def __add__(self, s): self.postfix += s return self def __sqlrepr__(self, db): s = self.expr if isinstance(s, SQLExpression): values = [] if self.prefix: values.append(quote_str(self.prefix, db)) s = _quote_like_special(sqlrepr(s, db), db) values.append(s) if self.postfix: values.append(quote_str(self.postfix, db)) if db == "mysql": return "CONCAT(%s)" % ", ".join(values) elif db in ("mssql", "sybase"): return " + ".join(values) else: return " || ".join(values) elif isinstance(s, string_type): s = _quote_like_special(unquote_str(sqlrepr(s, db)), db) return quote_str("%s%s%s" % (self.prefix, s, self.postfix), db) else: raise TypeError( "expected str, unicode or SQLExpression, got %s" % type(s)) def _quote_like_special(s, db): if db == 'postgres': escape = r'\\' else: escape = '\\' s = s.replace('\\', r'\\').\ replace('%', escape + '%').\ replace('_', escape + '_') return s
[docs]class CONCAT(SQLExpression): def __init__(self, *expressions): self.expressions = expressions def __sqlrepr__(self, db): values = [sqlrepr(expr, db) for expr in self.expressions] if db == "mysql": return "CONCAT(%s)" % ", ".join(values) elif db in ("mssql", "sybase"): return " + ".join(values) else: return " || ".join(values)
######################################## # SQL JOINs ########################################
[docs]class SQLJoin(SQLExpression): def __init__(self, table1, table2, op=','): if hasattr(table1, 'sqlmeta'): table1 = table1.sqlmeta.table if hasattr(table2, 'sqlmeta'): table2 = table2.sqlmeta.table if isinstance(table1, str): table1 = SQLConstant(table1) if isinstance(table2, str): table2 = SQLConstant(table2) self.table1 = table1 self.table2 = table2 self.op = op def __sqlrepr__(self, db): if self.table1: return "%s%s %s" % (sqlrepr(self.table1, db), self.op, sqlrepr(self.table2, db)) else: return "%s %s" % (self.op, sqlrepr(self.table2, db))
registerConverter(SQLJoin, SQLExprConverter)
[docs]def JOIN(table1, table2): return SQLJoin(table1, table2, " JOIN")
[docs]def INNERJOIN(table1, table2): return SQLJoin(table1, table2, " INNER JOIN")
[docs]def CROSSJOIN(table1, table2): return SQLJoin(table1, table2, " CROSS JOIN")
[docs]def STRAIGHTJOIN(table1, table2): return SQLJoin(table1, table2, " STRAIGHT JOIN")
[docs]def LEFTJOIN(table1, table2): return SQLJoin(table1, table2, " LEFT JOIN")
[docs]def LEFTOUTERJOIN(table1, table2): return SQLJoin(table1, table2, " LEFT OUTER JOIN")
[docs]def NATURALJOIN(table1, table2): return SQLJoin(table1, table2, " NATURAL JOIN")
[docs]def NATURALLEFTJOIN(table1, table2): return SQLJoin(table1, table2, " NATURAL LEFT JOIN")
[docs]def NATURALLEFTOUTERJOIN(table1, table2): return SQLJoin(table1, table2, " NATURAL LEFT OUTER JOIN")
[docs]def RIGHTJOIN(table1, table2): return SQLJoin(table1, table2, " RIGHT JOIN")
[docs]def RIGHTOUTERJOIN(table1, table2): return SQLJoin(table1, table2, " RIGHT OUTER JOIN")
[docs]def NATURALRIGHTJOIN(table1, table2): return SQLJoin(table1, table2, " NATURAL RIGHT JOIN")
[docs]def NATURALRIGHTOUTERJOIN(table1, table2): return SQLJoin(table1, table2, " NATURAL RIGHT OUTER JOIN")
[docs]def FULLJOIN(table1, table2): return SQLJoin(table1, table2, " FULL JOIN")
[docs]def FULLOUTERJOIN(table1, table2): return SQLJoin(table1, table2, " FULL OUTER JOIN")
[docs]def NATURALFULLJOIN(table1, table2): return SQLJoin(table1, table2, " NATURAL FULL JOIN")
[docs]def NATURALFULLOUTERJOIN(table1, table2): return SQLJoin(table1, table2, " NATURAL FULL OUTER JOIN")
[docs]class SQLJoinConditional(SQLJoin): """Conditional JOIN""" def __init__(self, table1, table2, op, on_condition=None, using_columns=None): """For condition you must give on_condition or using_columns but not both on_condition can be a string or SQLExpression, for example Table1.q.col1 == Table2.q.col2 using_columns can be a string or a list of columns, e.g. (Table1.q.col1, Table2.q.col2) """ if not on_condition and not using_columns: raise TypeError("You must give ON condition or USING columns") if on_condition and using_columns: raise TypeError( "You must give ON condition or USING columns but not both") SQLJoin.__init__(self, table1, table2, op) self.on_condition = on_condition self.using_columns = using_columns def __sqlrepr__(self, db): if self.on_condition: on_condition = self.on_condition if hasattr(on_condition, "__sqlrepr__"): on_condition = sqlrepr(on_condition, db) join = "%s %s ON %s" % (self.op, sqlrepr(self.table2, db), on_condition) if self.table1: join = "%s %s" % (sqlrepr(self.table1, db), join) return join elif self.using_columns: using_columns = [] for col in self.using_columns: if hasattr(col, "__sqlrepr__"): col = sqlrepr(col, db) using_columns.append(col) using_columns = ", ".join(using_columns) join = "%s %s USING (%s)" % (self.op, sqlrepr(self.table2, db), using_columns) if self.table1: join = "%s %s" % (sqlrepr(self.table1, db), join) return join else: RuntimeError, "Impossible error"
registerConverter(SQLJoinConditional, SQLExprConverter)
[docs]def INNERJOINConditional(table1, table2, on_condition=None, using_columns=None): return SQLJoinConditional(table1, table2, "INNER JOIN", on_condition, using_columns)
[docs]def LEFTJOINConditional(table1, table2, on_condition=None, using_columns=None): return SQLJoinConditional(table1, table2, "LEFT JOIN", on_condition, using_columns)
[docs]def LEFTOUTERJOINConditional(table1, table2, on_condition=None, using_columns=None): return SQLJoinConditional(table1, table2, "LEFT OUTER JOIN", on_condition, using_columns)
[docs]def RIGHTJOINConditional(table1, table2, on_condition=None, using_columns=None): return SQLJoinConditional(table1, table2, "RIGHT JOIN", on_condition, using_columns)
[docs]def RIGHTOUTERJOINConditional(table1, table2, on_condition=None, using_columns=None): return SQLJoinConditional(table1, table2, "RIGHT OUTER JOIN", on_condition, using_columns)
[docs]def FULLJOINConditional(table1, table2, on_condition=None, using_columns=None): return SQLJoinConditional(table1, table2, "FULL JOIN", on_condition, using_columns)
[docs]def FULLOUTERJOINConditional(table1, table2, on_condition=None, using_columns=None): return SQLJoinConditional(table1, table2, "FULL OUTER JOIN", on_condition, using_columns)
[docs]class SQLJoinOn(SQLJoinConditional): """Conditional JOIN ON""" def __init__(self, table1, table2, op, on_condition): SQLJoinConditional.__init__(self, table1, table2, op, on_condition)
registerConverter(SQLJoinOn, SQLExprConverter)
[docs]class SQLJoinUsing(SQLJoinConditional): """Conditional JOIN USING""" def __init__(self, table1, table2, op, using_columns): SQLJoinConditional.__init__(self, table1, table2, op, None, using_columns)
registerConverter(SQLJoinUsing, SQLExprConverter)
[docs]def INNERJOINOn(table1, table2, on_condition): return SQLJoinOn(table1, table2, "INNER JOIN", on_condition)
[docs]def LEFTJOINOn(table1, table2, on_condition): return SQLJoinOn(table1, table2, "LEFT JOIN", on_condition)
[docs]def LEFTOUTERJOINOn(table1, table2, on_condition): return SQLJoinOn(table1, table2, "LEFT OUTER JOIN", on_condition)
[docs]def RIGHTJOINOn(table1, table2, on_condition): return SQLJoinOn(table1, table2, "RIGHT JOIN", on_condition)
[docs]def RIGHTOUTERJOINOn(table1, table2, on_condition): return SQLJoinOn(table1, table2, "RIGHT OUTER JOIN", on_condition)
[docs]def FULLJOINOn(table1, table2, on_condition): return SQLJoinOn(table1, table2, "FULL JOIN", on_condition)
[docs]def FULLOUTERJOINOn(table1, table2, on_condition): return SQLJoinOn(table1, table2, "FULL OUTER JOIN", on_condition)
[docs]def INNERJOINUsing(table1, table2, using_columns): return SQLJoinUsing(table1, table2, "INNER JOIN", using_columns)
[docs]def LEFTJOINUsing(table1, table2, using_columns): return SQLJoinUsing(table1, table2, "LEFT JOIN", using_columns)
[docs]def LEFTOUTERJOINUsing(table1, table2, using_columns): return SQLJoinUsing(table1, table2, "LEFT OUTER JOIN", using_columns)
[docs]def RIGHTJOINUsing(table1, table2, using_columns): return SQLJoinUsing(table1, table2, "RIGHT JOIN", using_columns)
[docs]def RIGHTOUTERJOINUsing(table1, table2, using_columns): return SQLJoinUsing(table1, table2, "RIGHT OUTER JOIN", using_columns)
[docs]def FULLJOINUsing(table1, table2, using_columns): return SQLJoinUsing(table1, table2, "FULL JOIN", using_columns)
[docs]def FULLOUTERJOINUsing(table1, table2, using_columns): return SQLJoinUsing(table1, table2, "FULL OUTER JOIN", using_columns)
######################################## # Subqueries (subselects) ########################################
[docs]class OuterField(SQLObjectField):
[docs] def tablesUsedImmediate(self): return []
[docs]class OuterTable(SQLObjectTable): FieldClass = OuterField
[docs]class Outer: def __init__(self, table): self.q = OuterTable(table)
[docs]class LIKE(SQLExpression): op = "LIKE" def __init__(self, expr, string, escape=None): self.expr = expr self.string = string self.escape = escape def __sqlrepr__(self, db): escape = self.escape like = "%s %s (%s)" % (sqlrepr(self.expr, db), self.op, sqlrepr(self.string, db)) if escape is None: return "(%s)" % like else: return "(%s ESCAPE %s)" % (like, sqlrepr(escape, db))
[docs] def components(self): return [self.expr, self.string]
[docs] def execute(self, executor): if not hasattr(self, '_regex'): # @@: Crude, not entirely accurate dest = self.string dest = dest.replace("%%", "\001") dest = dest.replace("*", "\002") dest = dest.replace("%", "*") dest = dest.replace("\001", "%") dest = dest.replace("\002", "[*]") self._regex = re.compile(fnmatch.translate(dest), re.I) return self._regex.search(execute(self.expr, executor))
[docs]class RLIKE(LIKE): op = "RLIKE" op_db = { 'firebird': 'RLIKE', 'maxdb': 'RLIKE', 'mysql': 'RLIKE', 'postgres': '~', 'sqlite': 'REGEXP' } def _get_op(self, db): return self.op_db.get(db, 'LIKE') def __sqlrepr__(self, db): return "(%s %s (%s))" % ( sqlrepr(self.expr, db), self._get_op(db), sqlrepr(self.string, db) )
[docs] def execute(self, executor): self.op = self._get_op(self.db) return LIKE.execute(self, executor)
[docs]class INSubquery(SQLExpression): op = "IN" def __init__(self, item, subquery): self.item = item self.subquery = subquery
[docs] def components(self): return [self.item]
def __sqlrepr__(self, db): return "%s %s (%s)" % (sqlrepr(self.item, db), self.op, sqlrepr(self.subquery, db))
[docs]class NOTINSubquery(INSubquery): op = "NOT IN"
[docs]class Subquery(SQLExpression): def __init__(self, op, subquery): self.op = op self.subquery = subquery def __sqlrepr__(self, db): return "%s (%s)" % (self.op, sqlrepr(self.subquery, db))
[docs]def EXISTS(subquery): return Subquery("EXISTS", subquery)
[docs]def NOTEXISTS(subquery): return Subquery("NOT EXISTS", subquery)
[docs]def SOME(subquery): return Subquery("SOME", subquery)
[docs]def ANY(subquery): return Subquery("ANY", subquery)
[docs]def ALL(subquery): return Subquery("ALL", subquery)
####
[docs]class ImportProxyField(SQLObjectField):
[docs] def tablesUsedImmediate(self): return [str(self.tableName)]
[docs]class ImportProxy(SQLExpression): ''' Class to be used in column definitions that rely on other tables that might not yet be in a classregistry ''' FieldClass = ImportProxyField def __init__(self, clsName, registry=None): self.tableName = _DelayClass(self, clsName) self.sqlmeta = _Delay_proxy(table=_DelayClass(self, clsName)) self.q = self self.soClass = None classregistry.registry(registry).addClassCallback( clsName, lambda foreign, me: setattr(me, 'soClass', foreign), self) def __nonzero__(self): return True __bool__ = __nonzero__ def __getattr__(self, attr): if self.soClass is None: return _Delay(self, attr) return getattr(self.soClass.q, attr)
class _Delay(SQLExpression): def __init__(self, proxy, attr): self.attr = attr self.proxy = proxy def __sqlrepr__(self, db): if self.proxy.soClass is None: return '_DELAYED_' + self.attr val = self._resolve() if isinstance(val, SQLExpression): val = sqlrepr(val, db) return val def tablesUsedImmediate(self): return getattr(self._resolve(), 'tablesUsedImmediate', lambda: [])() def components(self): return getattr(self._resolve(), 'components', lambda: [])() def _resolve(self): return getattr(self.proxy, self.attr) # For AliasTable etc def fieldName(self): class _aliasFieldName(SQLExpression): def __init__(self, proxy): self.proxy = proxy def __sqlrepr__(self, db): return self.proxy._resolve().fieldName return _aliasFieldName(self) fieldName = property(fieldName) class _DelayClass(_Delay): def _resolve(self): return self.proxy.soClass.sqlmeta.table class _Delay_proxy(object): def __init__(self, **kw): self.__dict__.update(kw) ###### ######################################## # Global initializations ######################################## table = TableSpace() const = ConstantSpace() func = const ######################################## # Testing ######################################## if __name__ == "__main__": tests = """ >>> AND(table.address.name == "Ian Bicking", table.address.zip > 30000) >>> table.address.name >>> AND(LIKE(table.address.name, "this"), IN(table.address.zip, [100, 200, 300])) >>> Select([table.address.name, table.address.state], where=LIKE(table.address.name, "%ian%")) >>> Select([table.user.name], where=AND(table.user.state == table.states.abbrev)) >>> Insert(table.address, [{"name": "BOB", "address": "3049 N. 18th St."}, {"name": "TIM", "address": "409 S. 10th St."}]) >>> Insert(table.address, [("BOB", "3049 N. 18th St."), ("TIM", "409 S. 10th St.")], template=('name', 'address')) >>> Delete(table.address, where="BOB"==table.address.name) >>> Update(table.address, {"lastModified": const.NOW()}) >>> Replace(table.address, [("BOB", "3049 N. 18th St."), ("TIM", "409 S. 10th St.")], template=('name', 'address')) """ # noqa: allow long (> 79) lines for expr in tests.split('\n'): if not expr.strip(): continue if expr.startswith('>>> '): expr = expr[4:]