diff --git a/MySQLdb/_mysql_exceptions.py b/MySQLdb/_mysql_exceptions.py index 74b765a..99a79d7 100644 --- a/MySQLdb/_mysql_exceptions.py +++ b/MySQLdb/_mysql_exceptions.py @@ -13,42 +13,35 @@ except ImportError: class MySQLError(StandardError): - """Exception related to operation with MySQL.""" class Warning(Warning, MySQLError): - """Exception raised for important warnings like data truncations while inserting, etc.""" class Error(MySQLError): - """Exception that is the base class of all other error exceptions (not Warning).""" class InterfaceError(Error): - """Exception raised for errors that are related to the database interface rather than the database itself.""" class DatabaseError(Error): - """Exception raised for errors that are related to the database.""" class DataError(DatabaseError): - """Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range, etc.""" class OperationalError(DatabaseError): - """Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer, e.g. an unexpected disconnect occurs, the data source name is not @@ -57,31 +50,25 @@ class OperationalError(DatabaseError): class IntegrityError(DatabaseError): - """Exception raised when the relational integrity of the database is affected, e.g. a foreign key check fails, duplicate key, etc.""" class InternalError(DatabaseError): - """Exception raised when the database encounters an internal error, e.g. the cursor is not valid anymore, the transaction is out of sync, etc.""" class ProgrammingError(DatabaseError): - """Exception raised for programming errors, e.g. table not found or already exists, syntax error in the SQL statement, wrong number of parameters specified, etc.""" class NotSupportedError(DatabaseError): - """Exception raised in case a method or database API was used which is not supported by the database, e.g. requesting a .rollback() on a connection that does not support transaction or has transactions turned off.""" - - diff --git a/doc/FAQ.rst b/doc/FAQ.rst index 21e00b9..14c8f72 100644 --- a/doc/FAQ.rst +++ b/doc/FAQ.rst @@ -26,7 +26,7 @@ probably the issue, but it shouldn't happen any more. ImportError ----------- - ImportError: No module named _mysql + ImportError: No module named _mysql If you see this, it's likely you did some wrong when installing MySQLdb; re-read (or read) README. _mysql is the low-level C module @@ -42,7 +42,7 @@ still have to edit a configuration file so that the setup knows where to find MySQL and what libraries to include. - ImportError: libmysqlclient_r.so.14: cannot open shared object file: No such file or directory + ImportError: libmysqlclient_r.so.14: cannot open shared object file: No such file or directory The number after .so may vary, but this means you have a version of MySQLdb compiled against one version of MySQL, and are now trying to @@ -67,7 +67,7 @@ Solutions: `_. - ImportError: ld.so.1: python: fatal: libmtmalloc.so.1: DF_1_NOOPEN tagged object may not be dlopen()'ed + ImportError: ld.so.1: python: fatal: libmtmalloc.so.1: DF_1_NOOPEN tagged object may not be dlopen()'ed This is a weird one from Solaris. What does it mean? I have no idea. However, things like this can happen if there is some sort of a compiler @@ -79,9 +79,9 @@ different vendors. Solution: Rebuild Python or MySQL (or maybe both) from source. - ImportError: dlopen(./_mysql.so, 2): Symbol not found: _sprintf$LDBLStub - Referenced from: ./_mysql.so - Expected in: dynamic lookup + ImportError: dlopen(./_mysql.so, 2): Symbol not found: _sprintf$LDBLStub + Referenced from: ./_mysql.so + Expected in: dynamic lookup This is one from Mac OS X. It seems to have been a compiler mismatch, but this time between two different versions of GCC. It seems nearly @@ -110,7 +110,7 @@ rolled back, and they cause pending transactions to commit. Other Errors ------------ - OperationalError: (1251, 'Client does not support authentication protocol requested by server; consider upgrading MySQL client') + OperationalError: (1251, 'Client does not support authentication protocol requested by server; consider upgrading MySQL client') This means your server and client libraries are not the same version. More specifically, it probably means you have a 4.1 or newer server diff --git a/doc/MySQLdb.constants.rst b/doc/MySQLdb.constants.rst index e28dee2..9276f8f 100644 --- a/doc/MySQLdb.constants.rst +++ b/doc/MySQLdb.constants.rst @@ -1,59 +1,59 @@ -constants Package -================= - -:mod:`constants` Package ------------------------- - -.. automodule:: MySQLdb.constants - :members: - :undoc-members: - :show-inheritance: - -:mod:`CLIENT` Module --------------------- - -.. automodule:: MySQLdb.constants.CLIENT - :members: - :undoc-members: - :show-inheritance: - -:mod:`CR` Module ----------------- - -.. automodule:: MySQLdb.constants.CR - :members: - :undoc-members: - :show-inheritance: - -:mod:`ER` Module ----------------- - -.. automodule:: MySQLdb.constants.ER - :members: - :undoc-members: - :show-inheritance: - -:mod:`FIELD_TYPE` Module ------------------------- - -.. automodule:: MySQLdb.constants.FIELD_TYPE - :members: - :undoc-members: - :show-inheritance: - -:mod:`FLAG` Module ------------------- - -.. automodule:: MySQLdb.constants.FLAG - :members: - :undoc-members: - :show-inheritance: - -:mod:`REFRESH` Module ---------------------- - -.. automodule:: MySQLdb.constants.REFRESH - :members: - :undoc-members: - :show-inheritance: - +constants Package +================= + +:mod:`constants` Package +------------------------ + +.. automodule:: MySQLdb.constants + :members: + :undoc-members: + :show-inheritance: + +:mod:`CLIENT` Module +-------------------- + +.. automodule:: MySQLdb.constants.CLIENT + :members: + :undoc-members: + :show-inheritance: + +:mod:`CR` Module +---------------- + +.. automodule:: MySQLdb.constants.CR + :members: + :undoc-members: + :show-inheritance: + +:mod:`ER` Module +---------------- + +.. automodule:: MySQLdb.constants.ER + :members: + :undoc-members: + :show-inheritance: + +:mod:`FIELD_TYPE` Module +------------------------ + +.. automodule:: MySQLdb.constants.FIELD_TYPE + :members: + :undoc-members: + :show-inheritance: + +:mod:`FLAG` Module +------------------ + +.. automodule:: MySQLdb.constants.FLAG + :members: + :undoc-members: + :show-inheritance: + +:mod:`REFRESH` Module +--------------------- + +.. automodule:: MySQLdb.constants.REFRESH + :members: + :undoc-members: + :show-inheritance: + diff --git a/doc/_mysql_exceptions.rst b/doc/_mysql_exceptions.rst index 9b65de3..2d43525 100644 --- a/doc/_mysql_exceptions.rst +++ b/doc/_mysql_exceptions.rst @@ -1,7 +1,7 @@ -_mysql_exceptions Module -======================== - -.. automodule:: _mysql_exceptions - :members: - :undoc-members: - :show-inheritance: +_mysql_exceptions Module +======================== + +.. automodule:: _mysql_exceptions + :members: + :undoc-members: + :show-inheritance: diff --git a/doc/modules.rst b/doc/modules.rst index 7cf3faa..998ac46 100644 --- a/doc/modules.rst +++ b/doc/modules.rst @@ -1,7 +1,7 @@ -MySQLdb -======= - -.. toctree:: - :maxdepth: 4 - - MySQLdb +MySQLdb +======= + +.. toctree:: + :maxdepth: 4 + + MySQLdb diff --git a/setup_windows.py b/setup_windows.py index 0811e12..5a4d236 100644 --- a/setup_windows.py +++ b/setup_windows.py @@ -47,4 +47,3 @@ def get_config(): if __name__ == "__main__": sys.stderr.write("""You shouldn't be running this directly; it is used by setup.py.""") - diff --git a/tests/capabilities.py b/tests/capabilities.py index c341b3c..5d91379 100644 --- a/tests/capabilities.py +++ b/tests/capabilities.py @@ -1,298 +1,298 @@ -#!/usr/bin/env python -O -""" Script to test database capabilities and the DB-API interface - for functionality and memory leaks. - - Adapted from a script by M-A Lemburg. - -""" -from time import time -import array -import unittest -from configdb import connection_factory - -from MySQLdb.compat import unichr - - -class DatabaseTest(unittest.TestCase): - - db_module = None - connect_args = () - connect_kwargs = dict() - create_table_extra = '' - rows = 10 - debug = False - - def setUp(self): - import gc - db = connection_factory(**self.connect_kwargs) - self.connection = db - self.cursor = db.cursor() - self.BLOBUText = u''.join([unichr(i) for i in range(16384)]) - self.BLOBBinary = self.db_module.Binary((u''.join([unichr(i) for i in range(256)] * 16)).encode('latin1')) - - leak_test = True - - def tearDown(self): - if self.leak_test: - import gc - del self.cursor - orphans = gc.collect() - self.failIf(orphans, "%d orphaned objects found after deleting cursor" % orphans) - - del self.connection - orphans = gc.collect() - self.failIf(orphans, "%d orphaned objects found after deleting connection" % orphans) - - def table_exists(self, name): - try: - self.cursor.execute('select * from %s where 1=0' % name) - except: - return False - else: - return True - - def quote_identifier(self, ident): - return '"%s"' % ident - - def new_table_name(self): - i = id(self.cursor) - while True: - name = self.quote_identifier('tb%08x' % i) - if not self.table_exists(name): - return name - i = i + 1 - - def create_table(self, columndefs): - - """ Create a table using a list of column definitions given in - columndefs. - - generator must be a function taking arguments (row_number, - col_number) returning a suitable data object for insertion - into the table. - - """ - self.table = self.new_table_name() - self.cursor.execute('CREATE TABLE %s (%s) %s' % - (self.table, - ',\n'.join(columndefs), - self.create_table_extra)) - - def check_data_integrity(self, columndefs, generator): - # insert - self.create_table(columndefs) - insert_statement = ('INSERT INTO %s VALUES (%s)' % - (self.table, - ','.join(['%s'] * len(columndefs)))) - data = [ [ generator(i,j) for j in range(len(columndefs)) ] - for i in range(self.rows) ] - self.cursor.executemany(insert_statement, data) - self.connection.commit() - # verify - self.cursor.execute('select * from %s' % self.table) - l = self.cursor.fetchall() - self.assertEqual(len(l), self.rows) - try: - for i in range(self.rows): - for j in range(len(columndefs)): - self.assertEqual(l[i][j], generator(i,j)) - finally: - if not self.debug: - self.cursor.execute('drop table %s' % (self.table)) - - def test_transactions(self): - columndefs = ( 'col1 INT', 'col2 VARCHAR(255)') - def generator(row, col): - if col == 0: return row - else: return ('%i' % (row%10))*255 - self.create_table(columndefs) - insert_statement = ('INSERT INTO %s VALUES (%s)' % - (self.table, - ','.join(['%s'] * len(columndefs)))) - data = [ [ generator(i,j) for j in range(len(columndefs)) ] - for i in range(self.rows) ] - self.cursor.executemany(insert_statement, data) - # verify - self.connection.commit() - self.cursor.execute('select * from %s' % self.table) - l = self.cursor.fetchall() - self.assertEqual(len(l), self.rows) - for i in range(self.rows): - for j in range(len(columndefs)): - self.assertEqual(l[i][j], generator(i,j)) - delete_statement = 'delete from %s where col1=%%s' % self.table - self.cursor.execute(delete_statement, (0,)) - self.cursor.execute('select col1 from %s where col1=%s' % \ - (self.table, 0)) - l = self.cursor.fetchall() - self.assertFalse(l, "DELETE didn't work") - self.connection.rollback() - self.cursor.execute('select col1 from %s where col1=%s' % \ - (self.table, 0)) - l = self.cursor.fetchall() - self.assertTrue(len(l) == 1, "ROLLBACK didn't work") - self.cursor.execute('drop table %s' % (self.table)) - - def test_truncation(self): - columndefs = ( 'col1 INT', 'col2 VARCHAR(255)') - def generator(row, col): - if col == 0: return row - else: return ('%i' % (row%10))*((255-self.rows//2)+row) - self.create_table(columndefs) - insert_statement = ('INSERT INTO %s VALUES (%s)' % - (self.table, - ','.join(['%s'] * len(columndefs)))) - - try: - self.cursor.execute(insert_statement, (0, '0'*256)) - except self.connection.DataError: - pass - else: - self.fail("Over-long column did not generate warnings/exception with single insert") - - self.connection.rollback() - - try: - for i in range(self.rows): - data = [] - for j in range(len(columndefs)): - data.append(generator(i,j)) - self.cursor.execute(insert_statement,tuple(data)) - except self.connection.DataError: - pass - else: - self.fail("Over-long columns did not generate warnings/exception with execute()") - - self.connection.rollback() - - try: - data = [ [ generator(i,j) for j in range(len(columndefs)) ] - for i in range(self.rows) ] - self.cursor.executemany(insert_statement, data) - except self.connection.DataError: - pass - else: - self.fail("Over-long columns did not generate warnings/exception with executemany()") - - self.connection.rollback() - self.cursor.execute('drop table %s' % (self.table)) - - def test_CHAR(self): - # Character data - def generator(row,col): - return ('%i' % ((row+col) % 10)) * 255 - self.check_data_integrity( - ('col1 char(255)','col2 char(255)'), - generator) - - def test_INT(self): - # Number data - def generator(row,col): - return row*row - self.check_data_integrity( - ('col1 INT',), - generator) - - def test_DECIMAL(self): - # DECIMAL - from decimal import Decimal - def generator(row,col): - return Decimal("%d.%02d" % (row, col)) - self.check_data_integrity( - ('col1 DECIMAL(5,2)',), - generator) - - val = Decimal('1.11111111111111119E-7') - self.cursor.execute('SELECT %s', (val,)) - result = self.cursor.fetchone()[0] - self.assertEqual(result, val) - self.assertIsInstance(result, Decimal) - - self.cursor.execute('SELECT %s + %s', (Decimal('0.1'), Decimal('0.2'))) - result = self.cursor.fetchone()[0] - self.assertEqual(result, Decimal('0.3')) - self.assertIsInstance(result, Decimal) - - def test_DATE(self): - ticks = time() - def generator(row,col): - return self.db_module.DateFromTicks(ticks+row*86400-col*1313) - self.check_data_integrity( - ('col1 DATE',), - generator) - - def test_TIME(self): - ticks = time() - def generator(row,col): - return self.db_module.TimeFromTicks(ticks+row*86400-col*1313) - self.check_data_integrity( - ('col1 TIME',), - generator) - - def test_DATETIME(self): - ticks = time() - def generator(row,col): - return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313) - self.check_data_integrity( - ('col1 DATETIME',), - generator) - - def test_TIMESTAMP(self): - ticks = time() - def generator(row,col): - return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313) - self.check_data_integrity( - ('col1 TIMESTAMP',), - generator) - - def test_fractional_TIMESTAMP(self): - ticks = time() - def generator(row,col): - return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313+row*0.7*col/3.0) - self.check_data_integrity( - ('col1 TIMESTAMP',), - generator) - - def test_LONG(self): - def generator(row,col): - if col == 0: - return row - else: - return self.BLOBUText # 'BLOB Text ' * 1024 - self.check_data_integrity( - ('col1 INT','col2 LONG'), - generator) - - def test_TEXT(self): - def generator(row,col): - return self.BLOBUText # 'BLOB Text ' * 1024 - self.check_data_integrity( - ('col2 TEXT',), - generator) - - def test_LONG_BYTE(self): - def generator(row,col): - if col == 0: - return row - else: - return self.BLOBBinary # 'BLOB\000Binary ' * 1024 - self.check_data_integrity( - ('col1 INT','col2 LONG BYTE'), - generator) - - def test_BLOB(self): - def generator(row,col): - if col == 0: - return row - else: - return self.BLOBBinary # 'BLOB\000Binary ' * 1024 - self.check_data_integrity( - ('col1 INT','col2 BLOB'), - generator) - - def test_DOUBLE(self): - for val in (18014398509481982.0, 0.1): - self.cursor.execute('SELECT %s', (val,)); - result = self.cursor.fetchone()[0] - self.assertEqual(result, val) - self.assertIsInstance(result, float) +#!/usr/bin/env python -O +""" Script to test database capabilities and the DB-API interface + for functionality and memory leaks. + + Adapted from a script by M-A Lemburg. + +""" +from time import time +import array +import unittest +from configdb import connection_factory + +from MySQLdb.compat import unichr + + +class DatabaseTest(unittest.TestCase): + + db_module = None + connect_args = () + connect_kwargs = dict() + create_table_extra = '' + rows = 10 + debug = False + + def setUp(self): + import gc + db = connection_factory(**self.connect_kwargs) + self.connection = db + self.cursor = db.cursor() + self.BLOBUText = u''.join([unichr(i) for i in range(16384)]) + self.BLOBBinary = self.db_module.Binary((u''.join([unichr(i) for i in range(256)] * 16)).encode('latin1')) + + leak_test = True + + def tearDown(self): + if self.leak_test: + import gc + del self.cursor + orphans = gc.collect() + self.failIf(orphans, "%d orphaned objects found after deleting cursor" % orphans) + + del self.connection + orphans = gc.collect() + self.failIf(orphans, "%d orphaned objects found after deleting connection" % orphans) + + def table_exists(self, name): + try: + self.cursor.execute('select * from %s where 1=0' % name) + except: + return False + else: + return True + + def quote_identifier(self, ident): + return '"%s"' % ident + + def new_table_name(self): + i = id(self.cursor) + while True: + name = self.quote_identifier('tb%08x' % i) + if not self.table_exists(name): + return name + i = i + 1 + + def create_table(self, columndefs): + + """ Create a table using a list of column definitions given in + columndefs. + + generator must be a function taking arguments (row_number, + col_number) returning a suitable data object for insertion + into the table. + + """ + self.table = self.new_table_name() + self.cursor.execute('CREATE TABLE %s (%s) %s' % + (self.table, + ',\n'.join(columndefs), + self.create_table_extra)) + + def check_data_integrity(self, columndefs, generator): + # insert + self.create_table(columndefs) + insert_statement = ('INSERT INTO %s VALUES (%s)' % + (self.table, + ','.join(['%s'] * len(columndefs)))) + data = [ [ generator(i,j) for j in range(len(columndefs)) ] + for i in range(self.rows) ] + self.cursor.executemany(insert_statement, data) + self.connection.commit() + # verify + self.cursor.execute('select * from %s' % self.table) + l = self.cursor.fetchall() + self.assertEqual(len(l), self.rows) + try: + for i in range(self.rows): + for j in range(len(columndefs)): + self.assertEqual(l[i][j], generator(i,j)) + finally: + if not self.debug: + self.cursor.execute('drop table %s' % (self.table)) + + def test_transactions(self): + columndefs = ( 'col1 INT', 'col2 VARCHAR(255)') + def generator(row, col): + if col == 0: return row + else: return ('%i' % (row%10))*255 + self.create_table(columndefs) + insert_statement = ('INSERT INTO %s VALUES (%s)' % + (self.table, + ','.join(['%s'] * len(columndefs)))) + data = [ [ generator(i,j) for j in range(len(columndefs)) ] + for i in range(self.rows) ] + self.cursor.executemany(insert_statement, data) + # verify + self.connection.commit() + self.cursor.execute('select * from %s' % self.table) + l = self.cursor.fetchall() + self.assertEqual(len(l), self.rows) + for i in range(self.rows): + for j in range(len(columndefs)): + self.assertEqual(l[i][j], generator(i,j)) + delete_statement = 'delete from %s where col1=%%s' % self.table + self.cursor.execute(delete_statement, (0,)) + self.cursor.execute('select col1 from %s where col1=%s' % \ + (self.table, 0)) + l = self.cursor.fetchall() + self.assertFalse(l, "DELETE didn't work") + self.connection.rollback() + self.cursor.execute('select col1 from %s where col1=%s' % \ + (self.table, 0)) + l = self.cursor.fetchall() + self.assertTrue(len(l) == 1, "ROLLBACK didn't work") + self.cursor.execute('drop table %s' % (self.table)) + + def test_truncation(self): + columndefs = ( 'col1 INT', 'col2 VARCHAR(255)') + def generator(row, col): + if col == 0: return row + else: return ('%i' % (row%10))*((255-self.rows//2)+row) + self.create_table(columndefs) + insert_statement = ('INSERT INTO %s VALUES (%s)' % + (self.table, + ','.join(['%s'] * len(columndefs)))) + + try: + self.cursor.execute(insert_statement, (0, '0'*256)) + except self.connection.DataError: + pass + else: + self.fail("Over-long column did not generate warnings/exception with single insert") + + self.connection.rollback() + + try: + for i in range(self.rows): + data = [] + for j in range(len(columndefs)): + data.append(generator(i,j)) + self.cursor.execute(insert_statement,tuple(data)) + except self.connection.DataError: + pass + else: + self.fail("Over-long columns did not generate warnings/exception with execute()") + + self.connection.rollback() + + try: + data = [ [ generator(i,j) for j in range(len(columndefs)) ] + for i in range(self.rows) ] + self.cursor.executemany(insert_statement, data) + except self.connection.DataError: + pass + else: + self.fail("Over-long columns did not generate warnings/exception with executemany()") + + self.connection.rollback() + self.cursor.execute('drop table %s' % (self.table)) + + def test_CHAR(self): + # Character data + def generator(row,col): + return ('%i' % ((row+col) % 10)) * 255 + self.check_data_integrity( + ('col1 char(255)','col2 char(255)'), + generator) + + def test_INT(self): + # Number data + def generator(row,col): + return row*row + self.check_data_integrity( + ('col1 INT',), + generator) + + def test_DECIMAL(self): + # DECIMAL + from decimal import Decimal + def generator(row,col): + return Decimal("%d.%02d" % (row, col)) + self.check_data_integrity( + ('col1 DECIMAL(5,2)',), + generator) + + val = Decimal('1.11111111111111119E-7') + self.cursor.execute('SELECT %s', (val,)) + result = self.cursor.fetchone()[0] + self.assertEqual(result, val) + self.assertIsInstance(result, Decimal) + + self.cursor.execute('SELECT %s + %s', (Decimal('0.1'), Decimal('0.2'))) + result = self.cursor.fetchone()[0] + self.assertEqual(result, Decimal('0.3')) + self.assertIsInstance(result, Decimal) + + def test_DATE(self): + ticks = time() + def generator(row,col): + return self.db_module.DateFromTicks(ticks+row*86400-col*1313) + self.check_data_integrity( + ('col1 DATE',), + generator) + + def test_TIME(self): + ticks = time() + def generator(row,col): + return self.db_module.TimeFromTicks(ticks+row*86400-col*1313) + self.check_data_integrity( + ('col1 TIME',), + generator) + + def test_DATETIME(self): + ticks = time() + def generator(row,col): + return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313) + self.check_data_integrity( + ('col1 DATETIME',), + generator) + + def test_TIMESTAMP(self): + ticks = time() + def generator(row,col): + return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313) + self.check_data_integrity( + ('col1 TIMESTAMP',), + generator) + + def test_fractional_TIMESTAMP(self): + ticks = time() + def generator(row,col): + return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313+row*0.7*col/3.0) + self.check_data_integrity( + ('col1 TIMESTAMP',), + generator) + + def test_LONG(self): + def generator(row,col): + if col == 0: + return row + else: + return self.BLOBUText # 'BLOB Text ' * 1024 + self.check_data_integrity( + ('col1 INT','col2 LONG'), + generator) + + def test_TEXT(self): + def generator(row,col): + return self.BLOBUText # 'BLOB Text ' * 1024 + self.check_data_integrity( + ('col2 TEXT',), + generator) + + def test_LONG_BYTE(self): + def generator(row,col): + if col == 0: + return row + else: + return self.BLOBBinary # 'BLOB\000Binary ' * 1024 + self.check_data_integrity( + ('col1 INT','col2 LONG BYTE'), + generator) + + def test_BLOB(self): + def generator(row,col): + if col == 0: + return row + else: + return self.BLOBBinary # 'BLOB\000Binary ' * 1024 + self.check_data_integrity( + ('col1 INT','col2 BLOB'), + generator) + + def test_DOUBLE(self): + for val in (18014398509481982.0, 0.1): + self.cursor.execute('SELECT %s', (val,)); + result = self.cursor.fetchone()[0] + self.assertEqual(result, val) + self.assertIsInstance(result, float) diff --git a/tests/dbapi20.py b/tests/dbapi20.py index 106ea40..79c188a 100644 --- a/tests/dbapi20.py +++ b/tests/dbapi20.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -''' Python DB API 2.0 driver compliance unit test suite. - +''' Python DB API 2.0 driver compliance unit test suite. + This software is Public Domain and may be used without restrictions. "Now we have booze and barflies entering the discussion, plus rumours of @@ -67,8 +67,8 @@ import time class DatabaseAPI20Test(unittest.TestCase): ''' Test a database self.driver for DB API 2.0 compatibility. This implementation tests Gadfly, but the TestCase - is structured so that other self.drivers can subclass this - test case to ensure compiliance with the DB-API. It is + is structured so that other self.drivers can subclass this + test case to ensure compiliance with the DB-API. It is expected that this TestCase may be expanded in the future if ambiguities or edge conditions are discovered. @@ -78,9 +78,9 @@ class DatabaseAPI20Test(unittest.TestCase): self.driver, connect_args and connect_kw_args. Class specification should be as follows: - import dbapi20 + import dbapi20 class mytest(dbapi20.DatabaseAPI20Test): - [...] + [...] Don't 'import DatabaseAPI20Test from dbapi20', or you will confuse the unit tester - just 'import dbapi20'. @@ -99,7 +99,7 @@ class DatabaseAPI20Test(unittest.TestCase): xddl2 = 'drop table %sbarflys' % table_prefix lowerfunc = 'lower' # Name of stored procedure to convert string->lowercase - + # Some drivers may need to override these helpers, for example adding # a 'commit' after the execute. def executeDDL1(self,cursor): @@ -123,10 +123,10 @@ class DatabaseAPI20Test(unittest.TestCase): try: cur = con.cursor() for ddl in (self.xddl1,self.xddl2): - try: + try: cur.execute(ddl) con.commit() - except self.driver.Error: + except self.driver.Error: # Assume table didn't exist. Other tests will check if # execute is busted. pass @@ -238,7 +238,7 @@ class DatabaseAPI20Test(unittest.TestCase): con.rollback() except self.driver.NotSupportedError: pass - + def test_cursor(self): con = self._connect() try: @@ -392,7 +392,7 @@ class DatabaseAPI20Test(unittest.TestCase): ) elif self.driver.paramstyle == 'named': cur.execute( - 'insert into %sbooze values (:beer)' % self.table_prefix, + 'insert into %sbooze values (:beer)' % self.table_prefix, {'beer':"Cooper's"} ) elif self.driver.paramstyle == 'format': @@ -532,7 +532,7 @@ class DatabaseAPI20Test(unittest.TestCase): tests. ''' populate = [ - "insert into %sbooze values ('%s')" % (self.table_prefix,s) + "insert into %sbooze values ('%s')" % (self.table_prefix,s) for s in self.samples ] return populate @@ -593,7 +593,7 @@ class DatabaseAPI20Test(unittest.TestCase): self.assertEqual(len(rows),6) rows = [r[0] for r in rows] rows.sort() - + # Make sure we get the right data back out for i in range(0,6): self.assertEqual(rows[i],self.samples[i], @@ -664,10 +664,10 @@ class DatabaseAPI20Test(unittest.TestCase): 'cursor.fetchall should return an empty list if ' 'a select query returns no rows' ) - + finally: con.close() - + def test_mixedfetch(self): con = self._connect() try: @@ -703,7 +703,7 @@ class DatabaseAPI20Test(unittest.TestCase): def help_nextset_setUp(self,cur): ''' Should create a procedure called deleteme - that returns two result sets, first the + that returns two result sets, first the number of rows in booze then "name from booze" ''' raise NotImplementedError('Helper not implemented') diff --git a/tests/test_MySQLdb_capabilities.py b/tests/test_MySQLdb_capabilities.py index e9b0e2a..6e39d14 100644 --- a/tests/test_MySQLdb_capabilities.py +++ b/tests/test_MySQLdb_capabilities.py @@ -1,186 +1,186 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -import capabilities -from datetime import timedelta -from contextlib import closing -import unittest -import MySQLdb -from MySQLdb.compat import unicode -from MySQLdb import cursors -from configdb import connection_factory -import warnings - - -warnings.filterwarnings('ignore') - - -class test_MySQLdb(capabilities.DatabaseTest): - - db_module = MySQLdb - connect_args = () - connect_kwargs = dict(use_unicode=True, sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL") - create_table_extra = "ENGINE=INNODB CHARACTER SET UTF8" - leak_test = False - - def quote_identifier(self, ident): - return "`%s`" % ident - - def test_TIME(self): - def generator(row,col): - return timedelta(0, row*8000) - self.check_data_integrity( - ('col1 TIME',), - generator) - - def test_TINYINT(self): - # Number data - def generator(row, col): - v = (row*row) % 256 - if v > 127: - v = v-256 - return v - self.check_data_integrity( - ('col1 TINYINT',), - generator) - - def test_stored_procedures(self): - db = self.connection - c = self.cursor - self.create_table(('pos INT', 'tree CHAR(20)')) - c.executemany("INSERT INTO %s (pos,tree) VALUES (%%s,%%s)" % self.table, - list(enumerate('ash birch cedar Lärche pine'.split()))) - db.commit() - - c.execute(""" - CREATE PROCEDURE test_sp(IN t VARCHAR(255)) - BEGIN - SELECT pos FROM %s WHERE tree = t; - END - """ % self.table) - db.commit() - - c.callproc('test_sp', ('Lärche',)) - rows = c.fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(rows[0][0], 3) - c.nextset() - - c.execute("DROP PROCEDURE test_sp") - c.execute('drop table %s' % (self.table)) - - def test_small_CHAR(self): - # Character data - def generator(row,col): - i = (row*col+62)%256 - if i == 62: return '' - if i == 63: return None - return chr(i) - self.check_data_integrity( - ('col1 char(1)','col2 char(1)'), - generator) - - def test_BIT(self): - c = self.cursor - try: - c.execute("""create table test_BIT ( - b3 BIT(3), - b7 BIT(10), - b64 BIT(64))""") - - one64 = '1'*64 - c.execute( - "insert into test_BIT (b3, b7, b64)" - " VALUES (b'011', b'1111111111', b'%s')" - % one64) - - c.execute("SELECT b3, b7, b64 FROM test_BIT") - row = c.fetchone() - self.assertEqual(row[0], b'\x03') - self.assertEqual(row[1], b'\x03\xff') - self.assertEqual(row[2], b'\xff'*8) - finally: - c.execute("drop table if exists test_BIT") - - def test_MULTIPOLYGON(self): - c = self.cursor - try: - c.execute("""create table test_MULTIPOLYGON ( - id INTEGER PRIMARY KEY, - border MULTIPOLYGON)""") - - c.execute( - "insert into test_MULTIPOLYGON (id, border)" - " VALUES (1, GeomFromText('MULTIPOLYGON(((1 1, 1 -1, -1 -1, -1 1, 1 1)),((1 1, 3 1, 3 3, 1 3, 1 1)))'))" - ) - - c.execute("SELECT id, AsText(border) FROM test_MULTIPOLYGON") - row = c.fetchone() - self.assertEqual(row[0], 1) - self.assertEqual(row[1], 'MULTIPOLYGON(((1 1,1 -1,-1 -1,-1 1,1 1)),((1 1,3 1,3 3,1 3,1 1)))') - - c.execute("SELECT id, AsWKB(border) FROM test_MULTIPOLYGON") - row = c.fetchone() - self.assertEqual(row[0], 1) - self.assertNotEqual(len(row[1]), 0) - - c.execute("SELECT id, border FROM test_MULTIPOLYGON") - row = c.fetchone() - self.assertEqual(row[0], 1) - self.assertNotEqual(len(row[1]), 0) - finally: - c.execute("drop table if exists test_MULTIPOLYGON") - - def test_bug_2671682(self): - from MySQLdb.constants import ER - try: - self.cursor.execute("describe some_non_existent_table"); - except self.connection.ProgrammingError as msg: - self.assertTrue(str(ER.NO_SUCH_TABLE) in str(msg)) - - def test_bug_3514287(self): - c = self.cursor - try: - c.execute("""create table bug_3541287 ( - c1 CHAR(10), - t1 TIMESTAMP)""") - c.execute("insert into bug_3541287 (c1,t1) values (%s, NOW())", - ("blah",)) - finally: - c.execute("drop table if exists bug_3541287") - - def test_ping(self): - self.connection.ping() - - def test_reraise_exception(self): - c = self.cursor - try: - c.execute("SELECT x FROM not_existing_table") - except MySQLdb.ProgrammingError as e: - self.assertEqual(e.args[0], 1146) - return - self.fail("Should raise ProgrammingError") - - def test_binary_prefix(self): - # verify prefix behaviour when enabled, disabled and for default (disabled) - for binary_prefix in (True, False, None): - kwargs = self.connect_kwargs.copy() - # needs to be set to can guarantee CHARSET response for normal strings - kwargs['charset'] = 'utf8' - if binary_prefix != None: - kwargs['binary_prefix'] = binary_prefix - - with closing(connection_factory(**kwargs)) as conn: - with closing(conn.cursor()) as c: - c.execute('SELECT CHARSET(%s)', (MySQLdb.Binary(b'raw bytes'),)) - self.assertEqual(c.fetchall()[0][0], 'binary' if binary_prefix else 'utf8') - # normal strings should not get prefix - c.execute('SELECT CHARSET(%s)', ('str',)) - self.assertEqual(c.fetchall()[0][0], 'utf8') - - -if __name__ == '__main__': - if test_MySQLdb.leak_test: - import gc - gc.enable() - gc.set_debug(gc.DEBUG_LEAK) - unittest.main() +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import capabilities +from datetime import timedelta +from contextlib import closing +import unittest +import MySQLdb +from MySQLdb.compat import unicode +from MySQLdb import cursors +from configdb import connection_factory +import warnings + + +warnings.filterwarnings('ignore') + + +class test_MySQLdb(capabilities.DatabaseTest): + + db_module = MySQLdb + connect_args = () + connect_kwargs = dict(use_unicode=True, sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL") + create_table_extra = "ENGINE=INNODB CHARACTER SET UTF8" + leak_test = False + + def quote_identifier(self, ident): + return "`%s`" % ident + + def test_TIME(self): + def generator(row,col): + return timedelta(0, row*8000) + self.check_data_integrity( + ('col1 TIME',), + generator) + + def test_TINYINT(self): + # Number data + def generator(row, col): + v = (row*row) % 256 + if v > 127: + v = v-256 + return v + self.check_data_integrity( + ('col1 TINYINT',), + generator) + + def test_stored_procedures(self): + db = self.connection + c = self.cursor + self.create_table(('pos INT', 'tree CHAR(20)')) + c.executemany("INSERT INTO %s (pos,tree) VALUES (%%s,%%s)" % self.table, + list(enumerate('ash birch cedar Lärche pine'.split()))) + db.commit() + + c.execute(""" + CREATE PROCEDURE test_sp(IN t VARCHAR(255)) + BEGIN + SELECT pos FROM %s WHERE tree = t; + END + """ % self.table) + db.commit() + + c.callproc('test_sp', ('Lärche',)) + rows = c.fetchall() + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0][0], 3) + c.nextset() + + c.execute("DROP PROCEDURE test_sp") + c.execute('drop table %s' % (self.table)) + + def test_small_CHAR(self): + # Character data + def generator(row,col): + i = (row*col+62)%256 + if i == 62: return '' + if i == 63: return None + return chr(i) + self.check_data_integrity( + ('col1 char(1)','col2 char(1)'), + generator) + + def test_BIT(self): + c = self.cursor + try: + c.execute("""create table test_BIT ( + b3 BIT(3), + b7 BIT(10), + b64 BIT(64))""") + + one64 = '1'*64 + c.execute( + "insert into test_BIT (b3, b7, b64)" + " VALUES (b'011', b'1111111111', b'%s')" + % one64) + + c.execute("SELECT b3, b7, b64 FROM test_BIT") + row = c.fetchone() + self.assertEqual(row[0], b'\x03') + self.assertEqual(row[1], b'\x03\xff') + self.assertEqual(row[2], b'\xff'*8) + finally: + c.execute("drop table if exists test_BIT") + + def test_MULTIPOLYGON(self): + c = self.cursor + try: + c.execute("""create table test_MULTIPOLYGON ( + id INTEGER PRIMARY KEY, + border MULTIPOLYGON)""") + + c.execute( + "insert into test_MULTIPOLYGON (id, border)" + " VALUES (1, GeomFromText('MULTIPOLYGON(((1 1, 1 -1, -1 -1, -1 1, 1 1)),((1 1, 3 1, 3 3, 1 3, 1 1)))'))" + ) + + c.execute("SELECT id, AsText(border) FROM test_MULTIPOLYGON") + row = c.fetchone() + self.assertEqual(row[0], 1) + self.assertEqual(row[1], 'MULTIPOLYGON(((1 1,1 -1,-1 -1,-1 1,1 1)),((1 1,3 1,3 3,1 3,1 1)))') + + c.execute("SELECT id, AsWKB(border) FROM test_MULTIPOLYGON") + row = c.fetchone() + self.assertEqual(row[0], 1) + self.assertNotEqual(len(row[1]), 0) + + c.execute("SELECT id, border FROM test_MULTIPOLYGON") + row = c.fetchone() + self.assertEqual(row[0], 1) + self.assertNotEqual(len(row[1]), 0) + finally: + c.execute("drop table if exists test_MULTIPOLYGON") + + def test_bug_2671682(self): + from MySQLdb.constants import ER + try: + self.cursor.execute("describe some_non_existent_table"); + except self.connection.ProgrammingError as msg: + self.assertTrue(str(ER.NO_SUCH_TABLE) in str(msg)) + + def test_bug_3514287(self): + c = self.cursor + try: + c.execute("""create table bug_3541287 ( + c1 CHAR(10), + t1 TIMESTAMP)""") + c.execute("insert into bug_3541287 (c1,t1) values (%s, NOW())", + ("blah",)) + finally: + c.execute("drop table if exists bug_3541287") + + def test_ping(self): + self.connection.ping() + + def test_reraise_exception(self): + c = self.cursor + try: + c.execute("SELECT x FROM not_existing_table") + except MySQLdb.ProgrammingError as e: + self.assertEqual(e.args[0], 1146) + return + self.fail("Should raise ProgrammingError") + + def test_binary_prefix(self): + # verify prefix behaviour when enabled, disabled and for default (disabled) + for binary_prefix in (True, False, None): + kwargs = self.connect_kwargs.copy() + # needs to be set to can guarantee CHARSET response for normal strings + kwargs['charset'] = 'utf8' + if binary_prefix != None: + kwargs['binary_prefix'] = binary_prefix + + with closing(connection_factory(**kwargs)) as conn: + with closing(conn.cursor()) as c: + c.execute('SELECT CHARSET(%s)', (MySQLdb.Binary(b'raw bytes'),)) + self.assertEqual(c.fetchall()[0][0], 'binary' if binary_prefix else 'utf8') + # normal strings should not get prefix + c.execute('SELECT CHARSET(%s)', ('str',)) + self.assertEqual(c.fetchall()[0][0], 'utf8') + + +if __name__ == '__main__': + if test_MySQLdb.leak_test: + import gc + gc.enable() + gc.set_debug(gc.DEBUG_LEAK) + unittest.main() diff --git a/tests/test_MySQLdb_dbapi20.py b/tests/test_MySQLdb_dbapi20.py index 85fc5d5..1e808bd 100644 --- a/tests/test_MySQLdb_dbapi20.py +++ b/tests/test_MySQLdb_dbapi20.py @@ -20,7 +20,7 @@ class test_MySQLdb(dbapi20.DatabaseAPI20Test): test for an exception if the statement cannot return a result set. MySQL always returns a result set; it's just that some things return empty result sets.""" - + def test_fetchall(self): con = self._connect() try: @@ -66,10 +66,10 @@ class test_MySQLdb(dbapi20.DatabaseAPI20Test): 'cursor.fetchall should return an empty list if ' 'a select query returns no rows' ) - + finally: con.close() - + def test_fetchone(self): con = self._connect() try: @@ -148,7 +148,7 @@ class test_MySQLdb(dbapi20.DatabaseAPI20Test): def help_nextset_setUp(self,cur): ''' Should create a procedure called deleteme - that returns two result sets, first the + that returns two result sets, first the number of rows in booze then "name from booze" ''' sql=""" @@ -200,6 +200,6 @@ class test_MySQLdb(dbapi20.DatabaseAPI20Test): finally: con.close() - + if __name__ == '__main__': unittest.main()