From 98bec5fb8ce5ff34373f1ab48090fc717a7e06ac Mon Sep 17 00:00:00 2001 From: INADA Naoki Date: Thu, 5 Nov 2015 03:38:13 +0900 Subject: [PATCH] fixup --- MySQLdb/cursors.py | 2 +- _mysql.c | 56 ++-- tests/configdb.py | 5 +- tests/test_MySQLdb_dbapi20.py | 409 +++++++++++++++--------------- tests/test_MySQLdb_nonstandard.py | 3 +- 5 files changed, 238 insertions(+), 237 deletions(-) diff --git a/MySQLdb/cursors.py b/MySQLdb/cursors.py index 355cfd4..e2d74af 100644 --- a/MySQLdb/cursors.py +++ b/MySQLdb/cursors.py @@ -111,7 +111,7 @@ class BaseCursor(object): if self._warnings: # When there is next result, fetching warnings cause "command # out of sync" error. - if self._result.has_next: + if self._result and self._result.has_next: msg = "There are %d MySQL warnings." % (self._warnings,) self.messages.append(msg) warn(msg, self.Warning, 3) diff --git a/_mysql.c b/_mysql.c index ae2e3a0..bcd15b8 100644 --- a/_mysql.c +++ b/_mysql.c @@ -68,7 +68,7 @@ static PyObject *_mysql_IntegrityError; static PyObject *_mysql_InternalError; static PyObject *_mysql_ProgrammingError; static PyObject *_mysql_NotSupportedError; - + typedef struct { PyObject_HEAD MYSQL connection; @@ -227,7 +227,7 @@ _mysql_Exception(_mysql_ConnectionObject *c) Py_DECREF(t); return NULL; } - + static char _mysql_server_init__doc__[] = "Initialize embedded server. If this client is not linked against\n\ the embedded server library, this function does nothing.\n\ @@ -250,7 +250,7 @@ static PyObject *_mysql_server_init( "already initialized"); return NULL; } - + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO", kwlist, &cmd_args, &groups)) return NULL; @@ -349,7 +349,7 @@ static PyObject *_mysql_server_end( } return _mysql_Exception(NULL); } - + #if MYSQL_VERSION_ID >= 32314 static char _mysql_thread_safe__doc__[] = "Indicates whether the client is compiled as thread-safe."; @@ -557,7 +557,7 @@ _mysql_ConnectionObject_Initialize( char *init_command=NULL, *read_default_file=NULL, *read_default_group=NULL; - + self->converter = NULL; self->open = 0; check_server_init(-1); @@ -741,7 +741,7 @@ _mysql_connect( PyObject *kwargs) { _mysql_ConnectionObject *c=NULL; - + c = MyAlloc(_mysql_ConnectionObject, _mysql_ConnectionObject_Type); if (c == NULL) return NULL; if (_mysql_ConnectionObject_Initialize(c, args, kwargs)) { @@ -1291,7 +1291,7 @@ _mysql_escape_dict( Py_XDECREF(r); return NULL; } - + static char _mysql_ResultObject_describe__doc__[] = "Returns the sequence of 7-tuples required by the DB-API for\n\ the Cursor.description attribute.\n\ @@ -1328,7 +1328,7 @@ _mysql_ResultObject_describe( Py_XDECREF(d); return NULL; } - + static char _mysql_ResultObject_field_flags__doc__[] = "Returns a tuple of field flags, one for each column in the result.\n\ " ; @@ -1736,7 +1736,7 @@ _mysql_ConnectionObject_get_character_set_info( { PyObject *result; MY_CHARSET_INFO cs; - + if (!PyArg_ParseTuple(args, "")) return NULL; check_connection(self); mysql_get_character_set_info(&(self->connection), &cs); @@ -2726,44 +2726,44 @@ PyTypeObject _mysql_ConnectionObject_Type = { 0, /* tp_setattr */ 0, /*tp_compare*/ (reprfunc)_mysql_ConnectionObject_repr, /* tp_repr */ - + /* Method suites for standard classes */ - + 0, /* (PyNumberMethods *) tp_as_number */ 0, /* (PySequenceMethods *) tp_as_sequence */ 0, /* (PyMappingMethods *) tp_as_mapping */ - + /* More standard operations (here for binary compatibility) */ - + 0, /* (hashfunc) tp_hash */ 0, /* (ternaryfunc) tp_call */ 0, /* (reprfunc) tp_str */ (getattrofunc)_mysql_ConnectionObject_getattro, /* tp_getattro */ (setattrofunc)_mysql_ConnectionObject_setattro, /* tp_setattro */ - + /* Functions to access object as input/output buffer */ 0, /* (PyBufferProcs *) tp_as_buffer */ - + /* (tp_flags) Flags to define presence of optional/expanded features */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE, _mysql_connect__doc__, /* (char *) tp_doc Documentation string */ /* call function for all accessible objects */ (traverseproc) _mysql_ConnectionObject_traverse, /* tp_traverse */ - + /* delete references to contained objects */ (inquiry) _mysql_ConnectionObject_clear, /* tp_clear */ /* rich comparisons */ 0, /* (richcmpfunc) tp_richcompare */ - + /* weak reference enabler */ 0, /* (long) tp_weaklistoffset */ /* Iterators */ 0, /* (getiterfunc) tp_iter */ 0, /* (iternextfunc) tp_iternext */ - + /* Attribute descriptor and subclassing stuff */ (struct PyMethodDef *)_mysql_ConnectionObject_methods, /* tp_methods */ (struct PyMemberDef *)_mysql_ConnectionObject_memberlist, /* tp_members */ @@ -2798,45 +2798,45 @@ PyTypeObject _mysql_ResultObject_Type = { 0, /* tp_setattr */ 0, /*tp_compare*/ (reprfunc)_mysql_ResultObject_repr, /* tp_repr */ - + /* Method suites for standard classes */ - + 0, /* (PyNumberMethods *) tp_as_number */ 0, /* (PySequenceMethods *) tp_as_sequence */ 0, /* (PyMappingMethods *) tp_as_mapping */ - + /* More standard operations (here for binary compatibility) */ - + 0, /* (hashfunc) tp_hash */ 0, /* (ternaryfunc) tp_call */ 0, /* (reprfunc) tp_str */ (getattrofunc)PyObject_GenericGetAttr, /* tp_getattro */ (setattrofunc)_mysql_ResultObject_setattro, /* tp_setattr */ - + /* Functions to access object as input/output buffer */ 0, /* (PyBufferProcs *) tp_as_buffer */ - + /* Flags to define presence of optional/expanded features */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE, - + _mysql_ResultObject__doc__, /* (char *) tp_doc Documentation string */ /* call function for all accessible objects */ (traverseproc) _mysql_ResultObject_traverse, /* tp_traverse */ - + /* delete references to contained objects */ (inquiry) _mysql_ResultObject_clear, /* tp_clear */ /* rich comparisons */ 0, /* (richcmpfunc) tp_richcompare */ - + /* weak reference enabler */ 0, /* (long) tp_weaklistoffset */ /* Iterators */ 0, /* (getiterfunc) tp_iter */ 0, /* (iternextfunc) tp_iternext */ - + /* Attribute descriptor and subclassing stuff */ (struct PyMethodDef *) _mysql_ResultObject_methods, /* tp_methods */ (struct PyMemberDef *) _mysql_ResultObject_memberlist, /*tp_members */ diff --git a/tests/configdb.py b/tests/configdb.py index cd6d43d..307cc3f 100644 --- a/tests/configdb.py +++ b/tests/configdb.py @@ -10,16 +10,15 @@ connect_kwargs = dict( read_default_group = "MySQLdb-tests", ) + def connection_kwargs(kwargs): db_kwargs = connect_kwargs.copy() db_kwargs.update(kwargs) return db_kwargs + def connection_factory(**kwargs): import MySQLdb db_kwargs = connection_kwargs(kwargs) db = MySQLdb.connect(**db_kwargs) return db - - - diff --git a/tests/test_MySQLdb_dbapi20.py b/tests/test_MySQLdb_dbapi20.py index 2832e32..d8598dd 100644 --- a/tests/test_MySQLdb_dbapi20.py +++ b/tests/test_MySQLdb_dbapi20.py @@ -1,204 +1,205 @@ -#!/usr/bin/env python -import dbapi20 -import unittest -import MySQLdb -from configdb import connection_kwargs -import warnings -warnings.simplefilter("ignore") - -class test_MySQLdb(dbapi20.DatabaseAPI20Test): - driver = MySQLdb - connect_args = () - connect_kw_args = connection_kwargs(dict(sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL")) - - def test_setoutputsize(self): pass - def test_setoutputsize_basic(self): pass - def test_nextset(self): pass - - """The tests on fetchone and fetchall and rowcount bogusly - test for an exception if the statement cannot return a - result set. MySQL always returns a result set; it's just that - some things return empty result sets.""" - - def test_fetchall(self): - con = self._connect() - try: - cur = con.cursor() - # cursor.fetchall should raise an Error if called - # without executing a query that may return rows (such - # as a select) - self.assertRaises(self.driver.Error, cur.fetchall) - - self.executeDDL1(cur) - for sql in self._populate(): - cur.execute(sql) - - # cursor.fetchall should raise an Error if called - # after executing a a statement that cannot return rows -## self.assertRaises(self.driver.Error,cur.fetchall) - - cur.execute('select name from %sbooze' % self.table_prefix) - rows = cur.fetchall() - self.assertTrue(cur.rowcount in (-1,len(self.samples))) - self.assertEqual(len(rows),len(self.samples), - 'cursor.fetchall did not retrieve all rows' - ) - rows = [r[0] for r in rows] - rows.sort() - for i in range(0,len(self.samples)): - self.assertEqual(rows[i],self.samples[i], - 'cursor.fetchall retrieved incorrect rows' - ) - rows = cur.fetchall() - self.assertEqual( - len(rows),0, - 'cursor.fetchall should return an empty list if called ' - 'after the whole result set has been fetched' - ) - self.assertTrue(cur.rowcount in (-1,len(self.samples))) - - self.executeDDL2(cur) - cur.execute('select name from %sbarflys' % self.table_prefix) - rows = cur.fetchall() - self.assertTrue(cur.rowcount in (-1,0)) - self.assertEqual(len(rows),0, - 'cursor.fetchall should return an empty list if ' - 'a select query returns no rows' - ) - - finally: - con.close() - - def test_fetchone(self): - con = self._connect() - try: - cur = con.cursor() - - # cursor.fetchone should raise an Error if called before - # executing a select-type query - self.assertRaises(self.driver.Error,cur.fetchone) - - # cursor.fetchone should raise an Error if called after - # executing a query that cannnot return rows - self.executeDDL1(cur) -## self.assertRaises(self.driver.Error,cur.fetchone) - - cur.execute('select name from %sbooze' % self.table_prefix) - self.assertEqual(cur.fetchone(),None, - 'cursor.fetchone should return None if a query retrieves ' - 'no rows' - ) - self.assertTrue(cur.rowcount in (-1,0)) - - # cursor.fetchone should raise an Error if called after - # executing a query that cannnot return rows - cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( - self.table_prefix - )) -## self.assertRaises(self.driver.Error,cur.fetchone) - - cur.execute('select name from %sbooze' % self.table_prefix) - r = cur.fetchone() - self.assertEqual(len(r),1, - 'cursor.fetchone should have retrieved a single row' - ) - self.assertEqual(r[0],'Victoria Bitter', - 'cursor.fetchone retrieved incorrect data' - ) -## self.assertEqual(cur.fetchone(),None, -## 'cursor.fetchone should return None if no more rows available' -## ) - self.assertTrue(cur.rowcount in (-1,1)) - finally: - con.close() - - # Same complaint as for fetchall and fetchone - def test_rowcount(self): - con = self._connect() - try: - cur = con.cursor() - self.executeDDL1(cur) -## self.assertEqual(cur.rowcount,-1, -## 'cursor.rowcount should be -1 after executing no-result ' -## 'statements' -## ) - cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( - self.table_prefix - )) -## self.assertTrue(cur.rowcount in (-1,1), -## 'cursor.rowcount should == number or rows inserted, or ' -## 'set to -1 after executing an insert statement' -## ) - cur.execute("select name from %sbooze" % self.table_prefix) - self.assertTrue(cur.rowcount in (-1,1), - 'cursor.rowcount should == number of rows returned, or ' - 'set to -1 after executing a select statement' - ) - self.executeDDL2(cur) -## self.assertEqual(cur.rowcount,-1, -## 'cursor.rowcount not being reset to -1 after executing ' -## 'no-result statements' -## ) - finally: - con.close() - - def test_callproc(self): - pass # performed in test_MySQL_capabilities - - def help_nextset_setUp(self,cur): - ''' Should create a procedure called deleteme - that returns two result sets, first the - number of rows in booze then "name from booze" - ''' - sql=""" - create procedure deleteme() - begin - select count(*) from %(tp)sbooze; - select name from %(tp)sbooze; - end - """ % dict(tp=self.table_prefix) - cur.execute(sql) - - def help_nextset_tearDown(self,cur): - 'If cleaning up is needed after nextSetTest' - cur.execute("drop procedure deleteme") - - def test_nextset(self): - from warnings import warn - con = self._connect() - try: - cur = con.cursor() - if not hasattr(cur,'nextset'): - return - - try: - self.executeDDL1(cur) - sql=self._populate() - for sql in self._populate(): - cur.execute(sql) - - self.help_nextset_setUp(cur) - - cur.callproc('deleteme') - numberofrows=cur.fetchone() - assert numberofrows[0]== len(self.samples) - assert cur.nextset() - names=cur.fetchall() - assert len(names) == len(self.samples) - s=cur.nextset() - if s: - empty = cur.fetchall() - self.assertEquals(len(empty), 0, - "non-empty result set after other result sets") - #warn("Incompatibility: MySQL returns an empty result set for the CALL itself", - # Warning) - #assert s == None,'No more return sets, should return None' - finally: - self.help_nextset_tearDown(cur) - - finally: - con.close() - - -if __name__ == '__main__': - unittest.main() +#!/usr/bin/env python +import dbapi20 +import unittest +import MySQLdb +from configdb import connection_kwargs +import warnings +warnings.simplefilter("ignore") + + +class test_MySQLdb(dbapi20.DatabaseAPI20Test): + driver = MySQLdb + connect_args = () + connect_kw_args = connection_kwargs(dict(sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL")) + + def test_setoutputsize(self): pass + def test_setoutputsize_basic(self): pass + def test_nextset(self): pass + + """The tests on fetchone and fetchall and rowcount bogusly + test for an exception if the statement cannot return a + result set. MySQL always returns a result set; it's just that + some things return empty result sets.""" + + def test_fetchall(self): + con = self._connect() + try: + cur = con.cursor() + # cursor.fetchall should raise an Error if called + # without executing a query that may return rows (such + # as a select) + self.assertRaises(self.driver.Error, cur.fetchall) + + self.executeDDL1(cur) + for sql in self._populate(): + cur.execute(sql) + + # cursor.fetchall should raise an Error if called + # after executing a a statement that cannot return rows + #self.assertRaises(self.driver.Error,cur.fetchall) + + cur.execute('select name from %sbooze' % self.table_prefix) + rows = cur.fetchall() + self.assertTrue(cur.rowcount in (-1,len(self.samples))) + self.assertEqual(len(rows),len(self.samples), + 'cursor.fetchall did not retrieve all rows' + ) + rows = [r[0] for r in rows] + rows.sort() + for i in range(0,len(self.samples)): + self.assertEqual(rows[i],self.samples[i], + 'cursor.fetchall retrieved incorrect rows' + ) + rows = cur.fetchall() + self.assertEqual( + len(rows),0, + 'cursor.fetchall should return an empty list if called ' + 'after the whole result set has been fetched' + ) + self.assertTrue(cur.rowcount in (-1,len(self.samples))) + + self.executeDDL2(cur) + cur.execute('select name from %sbarflys' % self.table_prefix) + rows = cur.fetchall() + self.assertTrue(cur.rowcount in (-1,0)) + self.assertEqual(len(rows),0, + 'cursor.fetchall should return an empty list if ' + 'a select query returns no rows' + ) + + finally: + con.close() + + def test_fetchone(self): + con = self._connect() + try: + cur = con.cursor() + + # cursor.fetchone should raise an Error if called before + # executing a select-type query + self.assertRaises(self.driver.Error,cur.fetchone) + + # cursor.fetchone should raise an Error if called after + # executing a query that cannnot return rows + self.executeDDL1(cur) +## self.assertRaises(self.driver.Error,cur.fetchone) + + cur.execute('select name from %sbooze' % self.table_prefix) + self.assertEqual(cur.fetchone(),None, + 'cursor.fetchone should return None if a query retrieves ' + 'no rows' + ) + self.assertTrue(cur.rowcount in (-1,0)) + + # cursor.fetchone should raise an Error if called after + # executing a query that cannnot return rows + cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( + self.table_prefix + )) +## self.assertRaises(self.driver.Error,cur.fetchone) + + cur.execute('select name from %sbooze' % self.table_prefix) + r = cur.fetchone() + self.assertEqual(len(r),1, + 'cursor.fetchone should have retrieved a single row' + ) + self.assertEqual(r[0],'Victoria Bitter', + 'cursor.fetchone retrieved incorrect data' + ) +## self.assertEqual(cur.fetchone(),None, +## 'cursor.fetchone should return None if no more rows available' +## ) + self.assertTrue(cur.rowcount in (-1,1)) + finally: + con.close() + + # Same complaint as for fetchall and fetchone + def test_rowcount(self): + con = self._connect() + try: + cur = con.cursor() + self.executeDDL1(cur) +## self.assertEqual(cur.rowcount,-1, +## 'cursor.rowcount should be -1 after executing no-result ' +## 'statements' +## ) + cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( + self.table_prefix + )) +## self.assertTrue(cur.rowcount in (-1,1), +## 'cursor.rowcount should == number or rows inserted, or ' +## 'set to -1 after executing an insert statement' +## ) + cur.execute("select name from %sbooze" % self.table_prefix) + self.assertTrue(cur.rowcount in (-1,1), + 'cursor.rowcount should == number of rows returned, or ' + 'set to -1 after executing a select statement' + ) + self.executeDDL2(cur) +## self.assertEqual(cur.rowcount,-1, +## 'cursor.rowcount not being reset to -1 after executing ' +## 'no-result statements' +## ) + finally: + con.close() + + def test_callproc(self): + pass # performed in test_MySQL_capabilities + + def help_nextset_setUp(self,cur): + ''' Should create a procedure called deleteme + that returns two result sets, first the + number of rows in booze then "name from booze" + ''' + sql=""" + create procedure deleteme() + begin + select count(*) from %(tp)sbooze; + select name from %(tp)sbooze; + end + """ % dict(tp=self.table_prefix) + cur.execute(sql) + + def help_nextset_tearDown(self,cur): + 'If cleaning up is needed after nextSetTest' + cur.execute("drop procedure deleteme") + + def test_nextset(self): + #from warnings import warn + con = self._connect() + try: + cur = con.cursor() + if not hasattr(cur, 'nextset'): + return + + try: + self.executeDDL1(cur) + sql=self._populate() + for sql in self._populate(): + cur.execute(sql) + + self.help_nextset_setUp(cur) + + cur.callproc('deleteme') + numberofrows=cur.fetchone() + assert numberofrows[0]== len(self.samples) + assert cur.nextset() + names=cur.fetchall() + assert len(names) == len(self.samples) + s=cur.nextset() + if s: + empty = cur.fetchall() + self.assertEquals(len(empty), 0, + "non-empty result set after other result sets") + #warn("Incompatibility: MySQL returns an empty result set for the CALL itself", + # Warning) + #assert s == None,'No more return sets, should return None' + finally: + self.help_nextset_tearDown(cur) + + finally: + con.close() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_MySQLdb_nonstandard.py b/tests/test_MySQLdb_nonstandard.py index d92b260..2ca5a54 100644 --- a/tests/test_MySQLdb_nonstandard.py +++ b/tests/test_MySQLdb_nonstandard.py @@ -7,6 +7,7 @@ from configdb import connection_factory import warnings warnings.simplefilter("ignore") + class TestDBAPISet(unittest.TestCase): def test_set_equality(self): self.assertTrue(MySQLdb.STRING == MySQLdb.STRING) @@ -21,7 +22,7 @@ class TestDBAPISet(unittest.TestCase): self.assertTrue(FIELD_TYPE.DATE != MySQLdb.STRING) -class CoreModule(unittest.TestCase): +class TestCoreModule(unittest.TestCase): """Core _mysql module features.""" def test_NULL(self):