J'avais l'habitude de créer une version python de sqlite3. J'avais l'habitude de l'appeler pysql, mais pour une raison quelconque, j'ai supprimé la dernière version. Cependant, les tout premiers sont restés dans le blog, donc je les posterai ici. La seule fonction à ce stade est la fonction de lecture du fichier db. La version de sqlite3 utilisée pour créer le fichier db à lire est:
$ sqlite3 --version
3.7.9 2011-11-01 00:52:41 c7c6050ef060877ebe77b41d959e9df13f8c9b5e
Écrivez une table de fruits semblable à la suivante dans le fichier test.db.
$ sqlite3 test.db
SQLite version 3.7.9 2011-11-01 00:52:41
Enter ".help" for instructions
Enter SQL statements terminated with a ";"
sqlite> create table fruits(name string, value integer);
sqlite> insert into fruits values('apple', 100);
sqlite> insert into fruits values('orange', 130);
sqlite> .q
Lisons ceci en utilisant pysql.
$ ./pysql.py test.db fruits 0
('apple',)
('orange',)
$ ./pysql.py test.db fruits 1
(100,)
(130,)
$ ./pysql.py test.db fruits
('orange', 130)
$ ./pysql.py test.db fruits 0 1
('apple', 100)
('orange', 130)
$ ./pysql.py test.db fruits 1 0
(100, 'apple')
(130, 'orange')
J'ai réussi à bouger. Si vous faites une erreur dans le nom de la table, etc., le programme s'arrêtera d'un coup, mais le code source est le suivant.
pysql.py
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
import bitstring
HEADER_OFFSET_PAGE1 = 100
#page type
INTKEY = 0x01
ZERO_DATA = 0x02
LEAF_DATA = 0x04
LEAF = 0x08
TABLES = {'sqlite_master':(1,
"""CREATE sqlite_master(
type text,
name text,
tbl_name text,
rootpage integer,
sql text)'
""")}
SIZE = [0,1,2,3,4,6,8,8,0,0,0,0]
def get_fieldsize(serial_type):
if serial_type >= 12:
return (serial_type-12)/2
else:
return SIZE[serial_type];
class Pager(object):
def __init__(self, fname):
self.fp = bitstring.ConstBitStream(filename=fname)
self.pagesize = self.get_pagesize()
self.pages = {}
self.fp.pos = 20*8
nReserve = self.fp.read('uint:8')
self.usableSize = self.pagesize - nReserve
self.maxLeaf = self.usableSize - 35
self.minLeaf = (self.usableSize - 12) * 32/255 - 23
self.maxLocal = (self.usableSize - 12) * 64/255 - 23
self.minLocal = self.minLeaf
def read(self, type_fmt, pos):
self.fp.pos = pos
return self.fp.read(type_fmt)
def getPage(self, iTab):
page = self.pages.get(iTab)
if page is None:
page = Page(self, iTab)
return page
# primitive
def get2byte(self):
return self.fp.read('uint:8') << 8 | self.fp.read('uint:8')
def get4byte(self):
return self.fp.read('uint:8') << 24 | self.fp.read('uint:8') << 16 |\
self.fp.read('uint:8') << 8 | self.fp.read('uint:8')
def get_pagesize(self):
self.fp.pos = 16*8
return self.fp.read('uint:8') << 8 | self.fp.read('uint:8') << 16
def getVarint(self):
p = []
p.append(self.fp.read('uint:8'))
if not (p[0] & 0x80):
return p[0], 1
p.append(self.fp.read('uint:8'))
if not (p[1] & 0x80):
v = p[0] & 0x7f
v <<= 7
v |= p[1]
return v, 2
p.append(self.fp.read('uint:8'))
if not (p[2] & 0x80):
v = p[0] & 0x7f
v <<= 7
v |= p[1] & 0x7f
v <<= 7
v |= p[2] & 0x7f
return v, 3
raise Exception('too long')
def set_cellsize(self, page):
self.fp.pos = (page.pos + page.hdroffset + 3)*8
page.nCell = self.get2byte()
def get_pagetype(self, page):
self.fp.pos = (page.hdroffset + self.pagesize * (page.pageno-1)) * 8
return self.fp.read('uint:8')
def find_cell_offset(self, iCell, page):
mask = self.pagesize - 0x01
celloffset = page.pos + page.hdroffset + 8 + page.childPtrSize
if iCell == page.nCell:
self.fp.pos = (celloffset-4)*8
return self.fp.pos
self.fp.pos = (celloffset + iCell*2)*8
self.fp.pos = (page.pos + (mask & self.get2byte()))*8
return self.fp.pos
MAX_DEPTH = 20
class Cursor(object):
def __init__(self, fp, pgno):
self.fp = fp
self.pgno = pgno
self.cell = None
self.pages = [None]*MAX_DEPTH
self.iCells = [None]*MAX_DEPTH
self.depth = -1
def moveToLeftMost(self):
page = self.fp.getPage(self.pgno)
self.depth += 1
self.pages[self.depth] = page
self.iCells[self.depth] = 0
while not page.leaf:
self.depth += 1
self.iCells[self.depth] = 0
page = page.find_entry(self.fp, 0)
self.pages[self.depth] = page
assert(page.leaf)
if page.nCell == 0:# for empty table
raise StopIteration
self.cell = page.find_entry(self.fp, self.iCells[self.depth])
def moveNextLeaf(self):
page = self.pages[self.depth]
self.iCells[self.depth] += 1
iCell = self.iCells[self.depth]
if iCell > page.nCell - page.leaf:
if self.depth == 0:
raise StopIteration
self.depth -= 1
self.pgno = page.pageno
return self.moveNextLeaf()
else:
entry = page.find_entry(self.fp, iCell)
entry.setcell(self)
return self.cell
def next(self):
if self.cell is None:
self.moveToLeftMost()
return self.cell
else:
return self.moveNextLeaf()
def __iter__(self):
return self
def moveTo(self, iCell, pgno=None):
if pgno is None:
page = self.pages[self.depth]
else:
page = self.fp.getPage(pgno)
self.pages[self.depth] = page
assert(page.leaf)
self.iCells[self.depth] = iCell
self.cell = page.find_entry(self.fp, iCell)
class Page(object):
def __init__(self, pager, pageno):
if pageno == 1:
self.hdroffset = HEADER_OFFSET_PAGE1
else:
self.hdroffset = 0
self.pageno = pageno
leaf = False
childPtrSize = 4
if LEAF & pager.get_pagetype(self):
leaf = True
childPtrSize = 0
self.leaf = leaf
self.childPtrSize = childPtrSize
self.pos = pager.pagesize*(pageno-1)
self.maxLocal = pager.maxLeaf
self.minLocal = pager.minLeaf
self.nCell = None
self.nField = None
pager.set_cellsize(self)
pager.pages[pageno] = self
def find_entry(self, fp, iCell):
pos = fp.find_cell_offset(iCell, self)
if not self.leaf:
pgno = fp.get4byte()
return fp.getPage(pgno)
n = 0
nPayload, tn = fp.getVarint()
n += tn
intKey, tn = fp.getVarint()
n += tn
cell_hdr_offset = n
keyoff, tn = fp.getVarint()
n = tn
stypes = []
while n < keyoff:
serial_type, tn = fp.getVarint()
n += tn
stypes.append(serial_type)
if nPayload <= self.maxLocal:
nLocal = nPayload
else:
minLocal = self.minLocal
maxLocal = self.maxLocal
surplus = minLocal + (nPayload - minLocal) % (fp.usableSize - 4)
if surplus <= maxLocal:
nLocal = surplus
else:
nLocal = minLocal
return Cell(self, pos, nPayload, intKey, cell_hdr_offset, keyoff, stypes, nLocal)
def setcell(self, cursor):
cursor.pgno = self.pageno
cursor.moveToLeftMost()
class Cell(object):
def __init__(self, page, pos, nPayload, rowid, hdr_size, keyoffset, stypes, nLocal):
self.parent = page
offset = (hdr_size + keyoffset)*8
self.pos = pos
self.hdr = hdr_size*8
self.nPayload = nPayload
self.rowid = rowid
self.stypes = stypes
self.nLocal = nLocal
self.nField = len(stypes)
self.offsets = [offset]
for serial_type in stypes:
offset += get_fieldsize(serial_type)*8
self.offsets.append(offset)
def getvalue(self, fp, iField):
serial_type = self.stypes[iField]
offset = self.offsets[iField]
payload_size = get_fieldsize(serial_type)
if serial_type == 0 or serial_type == 10 or serial_type == 11:
return None
elif 1 <= serial_type and serial_type <= 6:
return fp.read('int:%d' % (payload_size*8), self.pos + offset)
else:
page = self.parent
if payload_size > self.nLocal:
ovflSize = fp.usableSize - 4
keyoffset = (offset - self.hdr)/8
size = self.nLocal - keyoffset
fp.fp.pos = self.pos + self.hdr + self.nLocal * 8
npgno = fp.get4byte()
buf = [fp.read('bytes:%d' % size, self.pos+offset)]
nOverflow = (payload_size-self.nLocal+ovflSize-1)/ovflSize
payload_size -= size
i = keyoffset/ovflSize
while payload_size > 0 and npgno != 0:
page = fp.getPage(npgno)
pos = (page.pos+4)*8
if payload_size > ovflSize:
nbytes = ovflSize
payload_size -= ovflSize
else:
nbytes = payload_size
payload_size = 0
buf.append(fp.read('bytes:%d' % nbytes, pos))
i+=1
fp.fp.pos = fp.pagesize*(npgno-1)*8
npgno = fp.get4byte()
if payload_size != 0:
raise Exception("database file is broken")
return ''.join(buf)
return fp.read('bytes:%d' % payload_size, self.pos+offset)
def setcell(self, cursor):
cursor.cell = self
def get_rootpageno(tname):
rootpage, sql = TABLES[tname]
return rootpage
def tables_add(row):
if row[0] == 'table':
TABLES[row[1]] = (row[3], row[4])
def printf(row):
print row
def init_db(fname):
iTab = 1
fp = Pager(fname)
cursor = Cursor(fp, iTab)
for cell in cursor:
tables_add([cell.getvalue(fp, i) for i in range(5)])
return fp
def main(fname, tabname=None, *argv):
fp = init_db(fname)
if tabname is None:
tabname = 'sqlite_master'
iTab = get_rootpageno(tabname)
cursor = Cursor(fp, iTab)
cursor.moveToLeftMost()
cell = cursor.cell
if argv == ():
indices = range(cell.nField)
else:
indices = []
for idx in argv:
indices.append(int(idx) % cell.nField)
print tuple([cell.getvalue(fp, idx) for idx in indices])
for cell in cursor:
print tuple([cell.getvalue(fp, idx) for idx in indices])
class DB(object):
def __init__(self, filename):
self.pager = init_db(filename)
def find(self, dic):
cols = dic['cols']
cursor = Cursor(self.pager, get_rootpageno(dic['from']))
offset = dic.get('offset')
if offset is not None:
for i in range(offset):
next(cursor)
limit = dic.get('limit')
n = 0
for cell in cursor:
if limit is not None and limit == n:
raise StopIteration
values = []
for col in cols:
values.append(cell.getvalue(self.pager, col))
yield values
n += 1
import sys
if __name__ == '__main__':
argc = len(sys.argv)
if argc < 2 :
print "usage:%s <dabasefile> <table>? <cols>?" % sys.argv[0]
sys.exit(1)
main(*sys.argv[1:])