Added a _mysql.quote_row() function which takes a row of data as a tuple

and quotes it, converting None into NULL.

MySQLdb.py is the DBI-1.1 compatible wrapper.

To do: Type conversions for fetch_row().
This commit is contained in:
adustman
1999-03-16 05:22:46 +00:00
parent 563ab10307
commit 161f66f556
2 changed files with 177 additions and 5 deletions

114
mysql/MySQLdb.py Normal file
View File

@ -0,0 +1,114 @@
import _mysql
from _mysql import *
from DateTime import Date, Time, Timestamp
from types import StringType, ListType, TupleType
threadsafety = 1
apllevel = "1.1"
class Cursor:
def __init__(self, connection, name=''):
self.connection = connection
self.name = name
self.description = None
self.rowcount = -1
self.result = None
self.arraysize = None
self.warnings = 1
def setinputsizes(self, size): pass
def setoutputsizes(self, size): pass
def execute(self, query, args=None):
from types import ListType, TupleType
from string import rfind, join, split, atoi
db = self.connection.db
if not args:
db.query(query)
elif type(args) is not ListType:
db.query(query % escape_row(args))
else:
p = rfind(query, '(')
if p == -1: raise ProgrammingError, "can't find values"
n = len(args)-1
q = [query % escape_row(args[0])]
qv = query[p:]
for a in args[1:]: q.append(qv % escape_row(a))
q = join(q, ',\n')
print q
db.query(q)
self.result = db.store_result()
if self.result:
self.description = self.result.describe()
self.rowcount = self.result.num_rows()
else:
self.description = None
self.rowcount = -1
if self.warnings:
w = db.info()
if w:
warnings = atoi(split(w)[-1])
if warnings:
raise Warning, w
def fetchone(self):
try:
return self.result.fetch_row()
except AttributeError:
raise ProgrammingError, "no query executed yet"
def fetchmany(self, size=None):
size = size or self.inputsizes or 1
rows = []
for i in range(size):
row = self.fetchone()
if not row: break
rows.append(row)
return rows
def fetchall(self):
rows = []
while 1:
row = self.fetchone()
if not row: break
rows.append(row)
return rows
def nextset(self): pass
def Raw(s): return s
STRING = FIELD_TYPE.STRING
NUMBER = FIELD_TYPE.LONG
TIME = FIELD_TYPE.TIME
TIMESTAMP = FIELD_TYPE.TIMESTAMP
ROW_ID = FIELD_TYPE.LONG
class Connection:
CursorClass = Cursor
def __init__(self, dsn=None, user=None, password=None,
host=None, database=None, **kwargs):
newargs = {}
if user: newargs['user'] = user
if password: newargs['passwd'] = password
if host: newargs['host'] = host
if database: newargs['db'] = database
newargs.update(kwargs)
self.db = apply(_mysql.connect, (), newargs)
def close(self):
self.db.close()
def commit(self): pass
def cursor(self, name=''):
return self.CursorClass(self, name)
Connect = connect = Connection

View File

@ -41,18 +41,26 @@ _mysql_Exception(c)
merr = mysql_errno(&(c->connection)); merr = mysql_errno(&(c->connection));
if (!merr) if (!merr)
e = _mysql_InterfaceError; e = _mysql_InterfaceError;
else if (merr < CR_MIN_ERROR)
e = _mysql_InternalError;
else if (merr > CR_MAX_ERROR) { else if (merr > CR_MAX_ERROR) {
PyTuple_SET_ITEM(t, 0, PyInt_FromLong(-1L)); PyTuple_SET_ITEM(t, 0, PyInt_FromLong(-1L));
PyTuple_SET_ITEM(t, 1, PyString_FromString("error totally whack")); PyTuple_SET_ITEM(t, 1, PyString_FromString("error totally whack"));
PyErr_SetObject(_mysql_Error, t); PyErr_SetObject(_mysql_Error, t);
return NULL; return NULL;
} }
else if (merr == CR_COMMANDS_OUT_OF_SYNC) else switch (merr) {
case CR_COMMANDS_OUT_OF_SYNC:
e = _mysql_ProgrammingError; e = _mysql_ProgrammingError;
break;
case ER_DUP_ENTRY:
e = _mysql_IntegrityError;
break;
default:
if (merr < 1000)
e = _mysql_InternalError;
else else
e = _mysql_OperationalError; e = _mysql_OperationalError;
break;
}
PyTuple_SET_ITEM(t, 0, PyInt_FromLong((long)merr)); PyTuple_SET_ITEM(t, 0, PyInt_FromLong((long)merr));
PyTuple_SET_ITEM(t, 1, PyString_FromString(mysql_error(&(c->connection)))); PyTuple_SET_ITEM(t, 1, PyString_FromString(mysql_error(&(c->connection))));
PyErr_SetObject(e, t); PyErr_SetObject(e, t);
@ -437,6 +445,52 @@ _mysql_escape_string(self, args)
return (str); return (str);
} }
static PyObject *_mysql_NULL;
static PyObject *
_mysql_escape_row(self, args)
PyObject *self;
PyObject *args;
{
PyObject *o, *r, *item, *quoted, *str, *itemstr;
char *in, *out;
int i, n, len, size;
if (!PyArg_ParseTuple(args, "O:escape_row", &o)) return NULL;
if (!PySequence_Check(o)) return NULL;
if (!(n = PyObject_Length(o))) return NULL;
if (!(r = PyTuple_New(n))) return NULL;
for (i=0; i<n; i++) {
if (!(item = PySequence_GetItem(o, i))) goto error;
if (item == Py_None) {
quoted = _mysql_NULL;
Py_INCREF(_mysql_NULL);
}
else {
if (!(itemstr = PyObject_Str(item)))
goto error;
in = PyString_AsString(itemstr);
size = PyString_Size(itemstr);
str = PyString_FromStringAndSize((char *)NULL, size*2+3);
if (!str) return PyErr_NoMemory();
out = PyString_AS_STRING(str);
len = mysql_escape_string(out+1, in, size);
*out = '\'' ;
*(out+len+1) = '\'' ;
*(out+len+2) = 0;
if (_PyString_Resize(&str, len+2) < 0)
goto error;
Py_DECREF(itemstr);
quoted = str;
}
PyTuple_SET_ITEM(r, i, quoted);
}
return r;
error:
Py_XDECREF(r);
return NULL;
}
static PyObject * static PyObject *
_mysql_ResultObject_describe(self, args) _mysql_ResultObject_describe(self, args)
_mysql_ResultObject *self; _mysql_ResultObject *self;
@ -1065,6 +1119,7 @@ PyTypeObject _mysql_ResultObject_Type = {
static PyMethodDef static PyMethodDef
_mysql_methods[] = { _mysql_methods[] = {
{ "connect", _mysql_connect, METH_VARARGS | METH_KEYWORDS }, { "connect", _mysql_connect, METH_VARARGS | METH_KEYWORDS },
{ "escape_row", _mysql_escape_row, METH_VARARGS },
{ "escape_string", _mysql_escape_string, METH_VARARGS }, { "escape_string", _mysql_escape_string, METH_VARARGS },
{ "get_client_info", _mysql_get_client_info, METH_VARARGS }, { "get_client_info", _mysql_get_client_info, METH_VARARGS },
{NULL, NULL} /* sentinel */ {NULL, NULL} /* sentinel */
@ -1148,6 +1203,9 @@ init_mysql()
goto error; goto error;
if (_mysql_Constant_class(dict, "ER", _mysql_Constant_er)) if (_mysql_Constant_class(dict, "ER", _mysql_Constant_er))
goto error; goto error;
if (!(_mysql_NULL = PyString_FromString("NULL")))
goto error;
if (PyDict_SetItemString(dict, "NULL", _mysql_NULL)) goto error;
error: error:
if (PyErr_Occurred()) if (PyErr_Occurred())
PyErr_SetString(PyExc_ImportError, PyErr_SetString(PyExc_ImportError,