code cleanup and reformat (#423)

This commit is contained in:
jnozsc
2020-01-20 02:47:22 -08:00
committed by Inada Naoki
parent 0f1ff4c9c0
commit 18163d7022
25 changed files with 1409 additions and 1067 deletions

View File

@ -1,23 +1,23 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import capabilities
from datetime import timedelta
from contextlib import closing
import unittest
import MySQLdb
from MySQLdb import cursors
from configdb import connection_factory
import warnings
warnings.filterwarnings('ignore')
warnings.filterwarnings("ignore")
class test_MySQLdb(capabilities.DatabaseTest):
db_module = MySQLdb
connect_args = ()
connect_kwargs = dict(use_unicode=True, sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL")
connect_kwargs = dict(
use_unicode=True, sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL"
)
create_table_extra = "ENGINE=INNODB CHARACTER SET UTF8"
leak_test = False
@ -25,97 +25,113 @@ class test_MySQLdb(capabilities.DatabaseTest):
return "`%s`" % ident
def test_TIME(self):
def generator(row,col):
return timedelta(0, row*8000)
self.check_data_integrity(
('col1 TIME',),
generator)
def generator(row, col):
return timedelta(0, row * 8000)
self.check_data_integrity(("col1 TIME",), generator)
def test_TINYINT(self):
# Number data
def generator(row, col):
v = (row*row) % 256
v = (row * row) % 256
if v > 127:
v = v-256
v = v - 256
return v
self.check_data_integrity(
('col1 TINYINT',),
generator)
self.check_data_integrity(("col1 TINYINT",), generator)
def test_stored_procedures(self):
db = self.connection
c = self.cursor
self.create_table(('pos INT', 'tree CHAR(20)'))
c.executemany("INSERT INTO %s (pos,tree) VALUES (%%s,%%s)" % self.table,
list(enumerate('ash birch cedar Lärche pine'.split())))
self.create_table(("pos INT", "tree CHAR(20)"))
c.executemany(
"INSERT INTO %s (pos,tree) VALUES (%%s,%%s)" % self.table,
list(enumerate("ash birch cedar Lärche pine".split())),
)
db.commit()
c.execute("""
c.execute(
"""
CREATE PROCEDURE test_sp(IN t VARCHAR(255))
BEGIN
SELECT pos FROM %s WHERE tree = t;
END
""" % self.table)
"""
% self.table
)
db.commit()
c.callproc('test_sp', ('Lärche',))
c.callproc("test_sp", ("Lärche",))
rows = c.fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0][0], 3)
c.nextset()
c.execute("DROP PROCEDURE test_sp")
c.execute('drop table %s' % (self.table))
c.execute("drop table %s" % (self.table))
def test_small_CHAR(self):
# Character data
def generator(row,col):
i = (row*col+62)%256
if i == 62: return ''
if i == 63: return None
def generator(row, col):
i = (row * col + 62) % 256
if i == 62:
return ""
if i == 63:
return None
return chr(i)
self.check_data_integrity(
('col1 char(1)','col2 char(1)'),
generator)
self.check_data_integrity(("col1 char(1)", "col2 char(1)"), generator)
def test_BIT(self):
c = self.cursor
try:
c.execute("""create table test_BIT (
c.execute(
"""create table test_BIT (
b3 BIT(3),
b7 BIT(10),
b64 BIT(64))""")
b64 BIT(64))"""
)
one64 = '1'*64
one64 = "1" * 64
c.execute(
"insert into test_BIT (b3, b7, b64)"
" VALUES (b'011', b'1111111111', b'%s')"
% one64)
" VALUES (b'011', b'1111111111', b'%s')" % one64
)
c.execute("SELECT b3, b7, b64 FROM test_BIT")
row = c.fetchone()
self.assertEqual(row[0], b'\x03')
self.assertEqual(row[1], b'\x03\xff')
self.assertEqual(row[2], b'\xff'*8)
self.assertEqual(row[0], b"\x03")
self.assertEqual(row[1], b"\x03\xff")
self.assertEqual(row[2], b"\xff" * 8)
finally:
c.execute("drop table if exists test_BIT")
def test_MULTIPOLYGON(self):
c = self.cursor
try:
c.execute("""create table test_MULTIPOLYGON (
c.execute(
"""create table test_MULTIPOLYGON (
id INTEGER PRIMARY KEY,
border MULTIPOLYGON)""")
border MULTIPOLYGON)"""
)
c.execute(
"insert into test_MULTIPOLYGON (id, border)"
" VALUES (1, GeomFromText('MULTIPOLYGON(((1 1, 1 -1, -1 -1, -1 1, 1 1)),((1 1, 3 1, 3 3, 1 3, 1 1)))'))"
"""
INSERT INTO test_MULTIPOLYGON
(id, border)
VALUES (1,
Geomfromtext(
'MULTIPOLYGON(((1 1, 1 -1, -1 -1, -1 1, 1 1)),((1 1, 3 1, 3 3, 1 3, 1 1)))'))
"""
)
c.execute("SELECT id, AsText(border) FROM test_MULTIPOLYGON")
row = c.fetchone()
self.assertEqual(row[0], 1)
self.assertEqual(row[1], 'MULTIPOLYGON(((1 1,1 -1,-1 -1,-1 1,1 1)),((1 1,3 1,3 3,1 3,1 1)))')
self.assertEqual(
row[1],
"MULTIPOLYGON(((1 1,1 -1,-1 -1,-1 1,1 1)),((1 1,3 1,3 3,1 3,1 1)))",
)
c.execute("SELECT id, AsWKB(border) FROM test_MULTIPOLYGON")
row = c.fetchone()
@ -131,19 +147,21 @@ class test_MySQLdb(capabilities.DatabaseTest):
def test_bug_2671682(self):
from MySQLdb.constants import ER
try:
self.cursor.execute("describe some_non_existent_table");
self.cursor.execute("describe some_non_existent_table")
except self.connection.ProgrammingError as msg:
self.assertTrue(str(ER.NO_SUCH_TABLE) in str(msg))
def test_bug_3514287(self):
c = self.cursor
try:
c.execute("""create table bug_3541287 (
c.execute(
"""create table bug_3541287 (
c1 CHAR(10),
t1 TIMESTAMP)""")
c.execute("insert into bug_3541287 (c1,t1) values (%s, NOW())",
("blah",))
t1 TIMESTAMP)"""
)
c.execute("insert into bug_3541287 (c1,t1) values (%s, NOW())", ("blah",))
finally:
c.execute("drop table if exists bug_3541287")
@ -164,22 +182,25 @@ class test_MySQLdb(capabilities.DatabaseTest):
for binary_prefix in (True, False, None):
kwargs = self.connect_kwargs.copy()
# needs to be set to can guarantee CHARSET response for normal strings
kwargs['charset'] = 'utf8'
if binary_prefix != None:
kwargs['binary_prefix'] = binary_prefix
kwargs["charset"] = "utf8"
if binary_prefix is not None:
kwargs["binary_prefix"] = binary_prefix
with closing(connection_factory(**kwargs)) as conn:
with closing(conn.cursor()) as c:
c.execute('SELECT CHARSET(%s)', (MySQLdb.Binary(b'raw bytes'),))
self.assertEqual(c.fetchall()[0][0], 'binary' if binary_prefix else 'utf8')
c.execute("SELECT CHARSET(%s)", (MySQLdb.Binary(b"raw bytes"),))
self.assertEqual(
c.fetchall()[0][0], "binary" if binary_prefix else "utf8"
)
# normal strings should not get prefix
c.execute('SELECT CHARSET(%s)', ('str',))
self.assertEqual(c.fetchall()[0][0], 'utf8')
c.execute("SELECT CHARSET(%s)", ("str",))
self.assertEqual(c.fetchall()[0][0], "utf8")
if __name__ == '__main__':
if __name__ == "__main__":
if test_MySQLdb.leak_test:
import gc
gc.enable()
gc.set_debug(gc.DEBUG_LEAK)
unittest.main()