This commit is contained in:
INADA Naoki
2015-11-05 03:38:13 +09:00
parent b07937487d
commit 98bec5fb8c
5 changed files with 238 additions and 237 deletions

View File

@@ -111,7 +111,7 @@ class BaseCursor(object):
if self._warnings:
# When there is next result, fetching warnings cause "command
# out of sync" error.
if self._result.has_next:
if self._result and self._result.has_next:
msg = "There are %d MySQL warnings." % (self._warnings,)
self.messages.append(msg)
warn(msg, self.Warning, 3)

View File

@@ -68,7 +68,7 @@ static PyObject *_mysql_IntegrityError;
static PyObject *_mysql_InternalError;
static PyObject *_mysql_ProgrammingError;
static PyObject *_mysql_NotSupportedError;
typedef struct {
PyObject_HEAD
MYSQL connection;
@@ -227,7 +227,7 @@ _mysql_Exception(_mysql_ConnectionObject *c)
Py_DECREF(t);
return NULL;
}
static char _mysql_server_init__doc__[] =
"Initialize embedded server. If this client is not linked against\n\
the embedded server library, this function does nothing.\n\
@@ -250,7 +250,7 @@ static PyObject *_mysql_server_init(
"already initialized");
return NULL;
}
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO", kwlist,
&cmd_args, &groups))
return NULL;
@@ -349,7 +349,7 @@ static PyObject *_mysql_server_end(
}
return _mysql_Exception(NULL);
}
#if MYSQL_VERSION_ID >= 32314
static char _mysql_thread_safe__doc__[] =
"Indicates whether the client is compiled as thread-safe.";
@@ -557,7 +557,7 @@ _mysql_ConnectionObject_Initialize(
char *init_command=NULL,
*read_default_file=NULL,
*read_default_group=NULL;
self->converter = NULL;
self->open = 0;
check_server_init(-1);
@@ -741,7 +741,7 @@ _mysql_connect(
PyObject *kwargs)
{
_mysql_ConnectionObject *c=NULL;
c = MyAlloc(_mysql_ConnectionObject, _mysql_ConnectionObject_Type);
if (c == NULL) return NULL;
if (_mysql_ConnectionObject_Initialize(c, args, kwargs)) {
@@ -1291,7 +1291,7 @@ _mysql_escape_dict(
Py_XDECREF(r);
return NULL;
}
static char _mysql_ResultObject_describe__doc__[] =
"Returns the sequence of 7-tuples required by the DB-API for\n\
the Cursor.description attribute.\n\
@@ -1328,7 +1328,7 @@ _mysql_ResultObject_describe(
Py_XDECREF(d);
return NULL;
}
static char _mysql_ResultObject_field_flags__doc__[] =
"Returns a tuple of field flags, one for each column in the result.\n\
" ;
@@ -1736,7 +1736,7 @@ _mysql_ConnectionObject_get_character_set_info(
{
PyObject *result;
MY_CHARSET_INFO cs;
if (!PyArg_ParseTuple(args, "")) return NULL;
check_connection(self);
mysql_get_character_set_info(&(self->connection), &cs);
@@ -2726,44 +2726,44 @@ PyTypeObject _mysql_ConnectionObject_Type = {
0, /* tp_setattr */
0, /*tp_compare*/
(reprfunc)_mysql_ConnectionObject_repr, /* tp_repr */
/* Method suites for standard classes */
0, /* (PyNumberMethods *) tp_as_number */
0, /* (PySequenceMethods *) tp_as_sequence */
0, /* (PyMappingMethods *) tp_as_mapping */
/* More standard operations (here for binary compatibility) */
0, /* (hashfunc) tp_hash */
0, /* (ternaryfunc) tp_call */
0, /* (reprfunc) tp_str */
(getattrofunc)_mysql_ConnectionObject_getattro, /* tp_getattro */
(setattrofunc)_mysql_ConnectionObject_setattro, /* tp_setattro */
/* Functions to access object as input/output buffer */
0, /* (PyBufferProcs *) tp_as_buffer */
/* (tp_flags) Flags to define presence of optional/expanded features */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE,
_mysql_connect__doc__, /* (char *) tp_doc Documentation string */
/* call function for all accessible objects */
(traverseproc) _mysql_ConnectionObject_traverse, /* tp_traverse */
/* delete references to contained objects */
(inquiry) _mysql_ConnectionObject_clear, /* tp_clear */
/* rich comparisons */
0, /* (richcmpfunc) tp_richcompare */
/* weak reference enabler */
0, /* (long) tp_weaklistoffset */
/* Iterators */
0, /* (getiterfunc) tp_iter */
0, /* (iternextfunc) tp_iternext */
/* Attribute descriptor and subclassing stuff */
(struct PyMethodDef *)_mysql_ConnectionObject_methods, /* tp_methods */
(struct PyMemberDef *)_mysql_ConnectionObject_memberlist, /* tp_members */
@@ -2798,45 +2798,45 @@ PyTypeObject _mysql_ResultObject_Type = {
0, /* tp_setattr */
0, /*tp_compare*/
(reprfunc)_mysql_ResultObject_repr, /* tp_repr */
/* Method suites for standard classes */
0, /* (PyNumberMethods *) tp_as_number */
0, /* (PySequenceMethods *) tp_as_sequence */
0, /* (PyMappingMethods *) tp_as_mapping */
/* More standard operations (here for binary compatibility) */
0, /* (hashfunc) tp_hash */
0, /* (ternaryfunc) tp_call */
0, /* (reprfunc) tp_str */
(getattrofunc)PyObject_GenericGetAttr, /* tp_getattro */
(setattrofunc)_mysql_ResultObject_setattro, /* tp_setattr */
/* Functions to access object as input/output buffer */
0, /* (PyBufferProcs *) tp_as_buffer */
/* Flags to define presence of optional/expanded features */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE,
_mysql_ResultObject__doc__, /* (char *) tp_doc Documentation string */
/* call function for all accessible objects */
(traverseproc) _mysql_ResultObject_traverse, /* tp_traverse */
/* delete references to contained objects */
(inquiry) _mysql_ResultObject_clear, /* tp_clear */
/* rich comparisons */
0, /* (richcmpfunc) tp_richcompare */
/* weak reference enabler */
0, /* (long) tp_weaklistoffset */
/* Iterators */
0, /* (getiterfunc) tp_iter */
0, /* (iternextfunc) tp_iternext */
/* Attribute descriptor and subclassing stuff */
(struct PyMethodDef *) _mysql_ResultObject_methods, /* tp_methods */
(struct PyMemberDef *) _mysql_ResultObject_memberlist, /*tp_members */

View File

@@ -10,16 +10,15 @@ connect_kwargs = dict(
read_default_group = "MySQLdb-tests",
)
def connection_kwargs(kwargs):
db_kwargs = connect_kwargs.copy()
db_kwargs.update(kwargs)
return db_kwargs
def connection_factory(**kwargs):
import MySQLdb
db_kwargs = connection_kwargs(kwargs)
db = MySQLdb.connect(**db_kwargs)
return db

View File

@@ -1,204 +1,205 @@
#!/usr/bin/env python
import dbapi20
import unittest
import MySQLdb
from configdb import connection_kwargs
import warnings
warnings.simplefilter("ignore")
class test_MySQLdb(dbapi20.DatabaseAPI20Test):
driver = MySQLdb
connect_args = ()
connect_kw_args = connection_kwargs(dict(sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL"))
def test_setoutputsize(self): pass
def test_setoutputsize_basic(self): pass
def test_nextset(self): pass
"""The tests on fetchone and fetchall and rowcount bogusly
test for an exception if the statement cannot return a
result set. MySQL always returns a result set; it's just that
some things return empty result sets."""
def test_fetchall(self):
con = self._connect()
try:
cur = con.cursor()
# cursor.fetchall should raise an Error if called
# without executing a query that may return rows (such
# as a select)
self.assertRaises(self.driver.Error, cur.fetchall)
self.executeDDL1(cur)
for sql in self._populate():
cur.execute(sql)
# cursor.fetchall should raise an Error if called
# after executing a a statement that cannot return rows
## self.assertRaises(self.driver.Error,cur.fetchall)
cur.execute('select name from %sbooze' % self.table_prefix)
rows = cur.fetchall()
self.assertTrue(cur.rowcount in (-1,len(self.samples)))
self.assertEqual(len(rows),len(self.samples),
'cursor.fetchall did not retrieve all rows'
)
rows = [r[0] for r in rows]
rows.sort()
for i in range(0,len(self.samples)):
self.assertEqual(rows[i],self.samples[i],
'cursor.fetchall retrieved incorrect rows'
)
rows = cur.fetchall()
self.assertEqual(
len(rows),0,
'cursor.fetchall should return an empty list if called '
'after the whole result set has been fetched'
)
self.assertTrue(cur.rowcount in (-1,len(self.samples)))
self.executeDDL2(cur)
cur.execute('select name from %sbarflys' % self.table_prefix)
rows = cur.fetchall()
self.assertTrue(cur.rowcount in (-1,0))
self.assertEqual(len(rows),0,
'cursor.fetchall should return an empty list if '
'a select query returns no rows'
)
finally:
con.close()
def test_fetchone(self):
con = self._connect()
try:
cur = con.cursor()
# cursor.fetchone should raise an Error if called before
# executing a select-type query
self.assertRaises(self.driver.Error,cur.fetchone)
# cursor.fetchone should raise an Error if called after
# executing a query that cannnot return rows
self.executeDDL1(cur)
## self.assertRaises(self.driver.Error,cur.fetchone)
cur.execute('select name from %sbooze' % self.table_prefix)
self.assertEqual(cur.fetchone(),None,
'cursor.fetchone should return None if a query retrieves '
'no rows'
)
self.assertTrue(cur.rowcount in (-1,0))
# cursor.fetchone should raise an Error if called after
# executing a query that cannnot return rows
cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
self.table_prefix
))
## self.assertRaises(self.driver.Error,cur.fetchone)
cur.execute('select name from %sbooze' % self.table_prefix)
r = cur.fetchone()
self.assertEqual(len(r),1,
'cursor.fetchone should have retrieved a single row'
)
self.assertEqual(r[0],'Victoria Bitter',
'cursor.fetchone retrieved incorrect data'
)
## self.assertEqual(cur.fetchone(),None,
## 'cursor.fetchone should return None if no more rows available'
## )
self.assertTrue(cur.rowcount in (-1,1))
finally:
con.close()
# Same complaint as for fetchall and fetchone
def test_rowcount(self):
con = self._connect()
try:
cur = con.cursor()
self.executeDDL1(cur)
## self.assertEqual(cur.rowcount,-1,
## 'cursor.rowcount should be -1 after executing no-result '
## 'statements'
## )
cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
self.table_prefix
))
## self.assertTrue(cur.rowcount in (-1,1),
## 'cursor.rowcount should == number or rows inserted, or '
## 'set to -1 after executing an insert statement'
## )
cur.execute("select name from %sbooze" % self.table_prefix)
self.assertTrue(cur.rowcount in (-1,1),
'cursor.rowcount should == number of rows returned, or '
'set to -1 after executing a select statement'
)
self.executeDDL2(cur)
## self.assertEqual(cur.rowcount,-1,
## 'cursor.rowcount not being reset to -1 after executing '
## 'no-result statements'
## )
finally:
con.close()
def test_callproc(self):
pass # performed in test_MySQL_capabilities
def help_nextset_setUp(self,cur):
''' Should create a procedure called deleteme
that returns two result sets, first the
number of rows in booze then "name from booze"
'''
sql="""
create procedure deleteme()
begin
select count(*) from %(tp)sbooze;
select name from %(tp)sbooze;
end
""" % dict(tp=self.table_prefix)
cur.execute(sql)
def help_nextset_tearDown(self,cur):
'If cleaning up is needed after nextSetTest'
cur.execute("drop procedure deleteme")
def test_nextset(self):
from warnings import warn
con = self._connect()
try:
cur = con.cursor()
if not hasattr(cur,'nextset'):
return
try:
self.executeDDL1(cur)
sql=self._populate()
for sql in self._populate():
cur.execute(sql)
self.help_nextset_setUp(cur)
cur.callproc('deleteme')
numberofrows=cur.fetchone()
assert numberofrows[0]== len(self.samples)
assert cur.nextset()
names=cur.fetchall()
assert len(names) == len(self.samples)
s=cur.nextset()
if s:
empty = cur.fetchall()
self.assertEquals(len(empty), 0,
"non-empty result set after other result sets")
#warn("Incompatibility: MySQL returns an empty result set for the CALL itself",
# Warning)
#assert s == None,'No more return sets, should return None'
finally:
self.help_nextset_tearDown(cur)
finally:
con.close()
if __name__ == '__main__':
unittest.main()
#!/usr/bin/env python
import dbapi20
import unittest
import MySQLdb
from configdb import connection_kwargs
import warnings
warnings.simplefilter("ignore")
class test_MySQLdb(dbapi20.DatabaseAPI20Test):
driver = MySQLdb
connect_args = ()
connect_kw_args = connection_kwargs(dict(sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL"))
def test_setoutputsize(self): pass
def test_setoutputsize_basic(self): pass
def test_nextset(self): pass
"""The tests on fetchone and fetchall and rowcount bogusly
test for an exception if the statement cannot return a
result set. MySQL always returns a result set; it's just that
some things return empty result sets."""
def test_fetchall(self):
con = self._connect()
try:
cur = con.cursor()
# cursor.fetchall should raise an Error if called
# without executing a query that may return rows (such
# as a select)
self.assertRaises(self.driver.Error, cur.fetchall)
self.executeDDL1(cur)
for sql in self._populate():
cur.execute(sql)
# cursor.fetchall should raise an Error if called
# after executing a a statement that cannot return rows
#self.assertRaises(self.driver.Error,cur.fetchall)
cur.execute('select name from %sbooze' % self.table_prefix)
rows = cur.fetchall()
self.assertTrue(cur.rowcount in (-1,len(self.samples)))
self.assertEqual(len(rows),len(self.samples),
'cursor.fetchall did not retrieve all rows'
)
rows = [r[0] for r in rows]
rows.sort()
for i in range(0,len(self.samples)):
self.assertEqual(rows[i],self.samples[i],
'cursor.fetchall retrieved incorrect rows'
)
rows = cur.fetchall()
self.assertEqual(
len(rows),0,
'cursor.fetchall should return an empty list if called '
'after the whole result set has been fetched'
)
self.assertTrue(cur.rowcount in (-1,len(self.samples)))
self.executeDDL2(cur)
cur.execute('select name from %sbarflys' % self.table_prefix)
rows = cur.fetchall()
self.assertTrue(cur.rowcount in (-1,0))
self.assertEqual(len(rows),0,
'cursor.fetchall should return an empty list if '
'a select query returns no rows'
)
finally:
con.close()
def test_fetchone(self):
con = self._connect()
try:
cur = con.cursor()
# cursor.fetchone should raise an Error if called before
# executing a select-type query
self.assertRaises(self.driver.Error,cur.fetchone)
# cursor.fetchone should raise an Error if called after
# executing a query that cannnot return rows
self.executeDDL1(cur)
## self.assertRaises(self.driver.Error,cur.fetchone)
cur.execute('select name from %sbooze' % self.table_prefix)
self.assertEqual(cur.fetchone(),None,
'cursor.fetchone should return None if a query retrieves '
'no rows'
)
self.assertTrue(cur.rowcount in (-1,0))
# cursor.fetchone should raise an Error if called after
# executing a query that cannnot return rows
cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
self.table_prefix
))
## self.assertRaises(self.driver.Error,cur.fetchone)
cur.execute('select name from %sbooze' % self.table_prefix)
r = cur.fetchone()
self.assertEqual(len(r),1,
'cursor.fetchone should have retrieved a single row'
)
self.assertEqual(r[0],'Victoria Bitter',
'cursor.fetchone retrieved incorrect data'
)
## self.assertEqual(cur.fetchone(),None,
## 'cursor.fetchone should return None if no more rows available'
## )
self.assertTrue(cur.rowcount in (-1,1))
finally:
con.close()
# Same complaint as for fetchall and fetchone
def test_rowcount(self):
con = self._connect()
try:
cur = con.cursor()
self.executeDDL1(cur)
## self.assertEqual(cur.rowcount,-1,
## 'cursor.rowcount should be -1 after executing no-result '
## 'statements'
## )
cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
self.table_prefix
))
## self.assertTrue(cur.rowcount in (-1,1),
## 'cursor.rowcount should == number or rows inserted, or '
## 'set to -1 after executing an insert statement'
## )
cur.execute("select name from %sbooze" % self.table_prefix)
self.assertTrue(cur.rowcount in (-1,1),
'cursor.rowcount should == number of rows returned, or '
'set to -1 after executing a select statement'
)
self.executeDDL2(cur)
## self.assertEqual(cur.rowcount,-1,
## 'cursor.rowcount not being reset to -1 after executing '
## 'no-result statements'
## )
finally:
con.close()
def test_callproc(self):
pass # performed in test_MySQL_capabilities
def help_nextset_setUp(self,cur):
''' Should create a procedure called deleteme
that returns two result sets, first the
number of rows in booze then "name from booze"
'''
sql="""
create procedure deleteme()
begin
select count(*) from %(tp)sbooze;
select name from %(tp)sbooze;
end
""" % dict(tp=self.table_prefix)
cur.execute(sql)
def help_nextset_tearDown(self,cur):
'If cleaning up is needed after nextSetTest'
cur.execute("drop procedure deleteme")
def test_nextset(self):
#from warnings import warn
con = self._connect()
try:
cur = con.cursor()
if not hasattr(cur, 'nextset'):
return
try:
self.executeDDL1(cur)
sql=self._populate()
for sql in self._populate():
cur.execute(sql)
self.help_nextset_setUp(cur)
cur.callproc('deleteme')
numberofrows=cur.fetchone()
assert numberofrows[0]== len(self.samples)
assert cur.nextset()
names=cur.fetchall()
assert len(names) == len(self.samples)
s=cur.nextset()
if s:
empty = cur.fetchall()
self.assertEquals(len(empty), 0,
"non-empty result set after other result sets")
#warn("Incompatibility: MySQL returns an empty result set for the CALL itself",
# Warning)
#assert s == None,'No more return sets, should return None'
finally:
self.help_nextset_tearDown(cur)
finally:
con.close()
if __name__ == '__main__':
unittest.main()

View File

@@ -7,6 +7,7 @@ from configdb import connection_factory
import warnings
warnings.simplefilter("ignore")
class TestDBAPISet(unittest.TestCase):
def test_set_equality(self):
self.assertTrue(MySQLdb.STRING == MySQLdb.STRING)
@@ -21,7 +22,7 @@ class TestDBAPISet(unittest.TestCase):
self.assertTrue(FIELD_TYPE.DATE != MySQLdb.STRING)
class CoreModule(unittest.TestCase):
class TestCoreModule(unittest.TestCase):
"""Core _mysql module features."""
def test_NULL(self):