lecteur sqlite3

J'avais l'habitude de créer une version python de sqlite3. J'avais l'habitude de l'appeler pysql, mais pour une raison quelconque, j'ai supprimé la dernière version. Cependant, les tout premiers sont restés dans le blog, donc je les posterai ici. La seule fonction à ce stade est la fonction de lecture du fichier db. La version de sqlite3 utilisée pour créer le fichier db à lire est:

$ sqlite3 --version
3.7.9 2011-11-01 00:52:41 c7c6050ef060877ebe77b41d959e9df13f8c9b5e

Écrivez une table de fruits semblable à la suivante dans le fichier test.db.

$ sqlite3 test.db
SQLite version 3.7.9 2011-11-01 00:52:41
Enter ".help" for instructions
Enter SQL statements terminated with a ";"
sqlite> create table fruits(name string, value integer);
sqlite> insert into fruits values('apple', 100);
sqlite> insert into fruits values('orange', 130);
sqlite> .q

Lisons ceci en utilisant pysql.

$ ./pysql.py test.db fruits 0
('apple',)
('orange',)
$ ./pysql.py test.db fruits 1
(100,)
(130,)
$ ./pysql.py test.db fruits 
('orange', 130)
$ ./pysql.py test.db fruits 0 1
('apple', 100)
('orange', 130)
$ ./pysql.py test.db fruits 1 0
(100, 'apple')
(130, 'orange')

J'ai réussi à bouger. Si vous faites une erreur dans le nom de la table, etc., le programme s'arrêtera d'un coup, mais le code source est le suivant.

pysql.py


#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
import bitstring
HEADER_OFFSET_PAGE1 = 100
#page type
INTKEY = 0x01
ZERO_DATA = 0x02
LEAF_DATA = 0x04
LEAF = 0x08
TABLES = {'sqlite_master':(1, 
"""CREATE sqlite_master(
                type text, 
                name text, 
                tbl_name text, 
                rootpage integer, 
                sql text)'
""")}
SIZE = [0,1,2,3,4,6,8,8,0,0,0,0]
def get_fieldsize(serial_type):
    if serial_type >= 12:
        return (serial_type-12)/2
    else:
        return SIZE[serial_type];
class Pager(object):
    def __init__(self, fname):
        self.fp = bitstring.ConstBitStream(filename=fname)
        self.pagesize = self.get_pagesize()
        self.pages = {}
        self.fp.pos = 20*8

        nReserve = self.fp.read('uint:8')
        self.usableSize = self.pagesize - nReserve
        self.maxLeaf = self.usableSize - 35
        self.minLeaf = (self.usableSize - 12) * 32/255 - 23
        self.maxLocal = (self.usableSize - 12) * 64/255 - 23
        self.minLocal = self.minLeaf
    def read(self, type_fmt, pos):
        self.fp.pos = pos
        return self.fp.read(type_fmt)
    def getPage(self, iTab):
        page = self.pages.get(iTab)
        if page is None:
            page = Page(self, iTab)
        return page
# primitive
    def get2byte(self):
       return self.fp.read('uint:8') << 8 | self.fp.read('uint:8')
    def get4byte(self):
        return self.fp.read('uint:8') << 24 | self.fp.read('uint:8') << 16 |\
        self.fp.read('uint:8') << 8 | self.fp.read('uint:8')
    def get_pagesize(self):
        self.fp.pos = 16*8
        return self.fp.read('uint:8') << 8 | self.fp.read('uint:8') << 16
    def getVarint(self):
        p = []
        p.append(self.fp.read('uint:8'))
        if not (p[0] & 0x80):
            return p[0], 1
        p.append(self.fp.read('uint:8'))
        if not (p[1] & 0x80):
            v = p[0] & 0x7f
            v <<= 7
            v |= p[1]
            return v, 2
        p.append(self.fp.read('uint:8'))
        if not (p[2] & 0x80):
            v = p[0] & 0x7f
            v <<= 7
            v |= p[1] & 0x7f
            v <<= 7
            v |= p[2] & 0x7f
            return v, 3
        raise Exception('too long')
    def set_cellsize(self, page):
        self.fp.pos = (page.pos + page.hdroffset + 3)*8
        page.nCell = self.get2byte()
    def get_pagetype(self, page):
        self.fp.pos = (page.hdroffset + self.pagesize * (page.pageno-1)) * 8
        return self.fp.read('uint:8')
    def find_cell_offset(self, iCell, page):
        mask = self.pagesize - 0x01
        celloffset = page.pos + page.hdroffset + 8 + page.childPtrSize
        if iCell == page.nCell:
            self.fp.pos = (celloffset-4)*8
            return self.fp.pos
        self.fp.pos = (celloffset + iCell*2)*8
        self.fp.pos = (page.pos + (mask & self.get2byte()))*8
        return self.fp.pos
MAX_DEPTH = 20
class Cursor(object):
    def __init__(self, fp, pgno):
        self.fp = fp
        self.pgno = pgno
        self.cell = None
        self.pages = [None]*MAX_DEPTH
        self.iCells = [None]*MAX_DEPTH
        self.depth = -1
    def moveToLeftMost(self):
        page = self.fp.getPage(self.pgno)
        self.depth += 1
        self.pages[self.depth] = page
        self.iCells[self.depth] = 0
        while not page.leaf:
            self.depth += 1  
            self.iCells[self.depth] = 0
            page = page.find_entry(self.fp, 0)
            self.pages[self.depth] = page
        assert(page.leaf)
        if page.nCell == 0:# for empty table
            raise StopIteration
        self.cell = page.find_entry(self.fp, self.iCells[self.depth])
    def moveNextLeaf(self):
        page = self.pages[self.depth]
        self.iCells[self.depth] += 1
        iCell = self.iCells[self.depth]
        if iCell > page.nCell - page.leaf:
            if self.depth == 0:
                raise StopIteration 
            self.depth -= 1
            self.pgno = page.pageno
            return self.moveNextLeaf()
        else:
            entry = page.find_entry(self.fp, iCell)
            entry.setcell(self)
            return self.cell
    def next(self):
        if self.cell is None:
            self.moveToLeftMost()
            return self.cell
        else:
            return self.moveNextLeaf() 
    def __iter__(self):
        return self
    def moveTo(self, iCell, pgno=None):
        if pgno is None:
            page = self.pages[self.depth]
        else:
            page = self.fp.getPage(pgno)
            self.pages[self.depth] = page
        assert(page.leaf)
        self.iCells[self.depth] = iCell
        self.cell = page.find_entry(self.fp, iCell)
class Page(object):
    def __init__(self, pager, pageno):
        if pageno == 1:
            self.hdroffset = HEADER_OFFSET_PAGE1
        else:
            self.hdroffset = 0
        self.pageno = pageno
        leaf = False
        childPtrSize = 4
        if LEAF & pager.get_pagetype(self):
            leaf = True
            childPtrSize = 0 
        self.leaf = leaf
        self.childPtrSize = childPtrSize

        self.pos = pager.pagesize*(pageno-1)
        self.maxLocal = pager.maxLeaf
        self.minLocal = pager.minLeaf
        self.nCell = None
        self.nField = None
        pager.set_cellsize(self)
        pager.pages[pageno] = self 
    def find_entry(self, fp, iCell):
        pos = fp.find_cell_offset(iCell, self)
        if not self.leaf:
            pgno = fp.get4byte() 
            return fp.getPage(pgno) 
        n = 0
        nPayload, tn =  fp.getVarint()
        n += tn
        intKey, tn = fp.getVarint()
        n += tn
        cell_hdr_offset = n
        keyoff, tn = fp.getVarint()
        n = tn
        stypes = []
        while n < keyoff:
            serial_type, tn = fp.getVarint()
            n += tn
            stypes.append(serial_type)
        if nPayload <= self.maxLocal:
            nLocal = nPayload
        else:
            minLocal = self.minLocal
            maxLocal = self.maxLocal
            surplus = minLocal + (nPayload - minLocal) % (fp.usableSize - 4)
            if surplus <= maxLocal:
                nLocal = surplus
            else:
                nLocal = minLocal
        return Cell(self, pos, nPayload, intKey, cell_hdr_offset, keyoff, stypes, nLocal)
    def setcell(self, cursor):
        cursor.pgno = self.pageno
        cursor.moveToLeftMost()
class Cell(object):
    def __init__(self, page, pos, nPayload, rowid, hdr_size, keyoffset, stypes, nLocal):
        self.parent = page
        offset = (hdr_size + keyoffset)*8
        self.pos = pos
        self.hdr = hdr_size*8
        self.nPayload = nPayload
        self.rowid = rowid
        self.stypes = stypes
        self.nLocal = nLocal

        self.nField = len(stypes)
        self.offsets = [offset]
        for serial_type in stypes:
            offset += get_fieldsize(serial_type)*8
            self.offsets.append(offset)
    def getvalue(self, fp, iField):
        serial_type = self.stypes[iField]
        offset = self.offsets[iField]
        payload_size = get_fieldsize(serial_type)
        if serial_type == 0 or serial_type == 10 or serial_type == 11:
            return None
        elif 1 <= serial_type and serial_type <= 6:
            return fp.read('int:%d' % (payload_size*8), self.pos + offset)
        else:
            page = self.parent
            if payload_size > self.nLocal:
                ovflSize = fp.usableSize - 4
                keyoffset = (offset - self.hdr)/8
                size = self.nLocal - keyoffset
                fp.fp.pos = self.pos + self.hdr + self.nLocal * 8
                npgno = fp.get4byte()
                buf = [fp.read('bytes:%d' % size, self.pos+offset)]
                nOverflow = (payload_size-self.nLocal+ovflSize-1)/ovflSize
                payload_size -= size
                i = keyoffset/ovflSize
                while payload_size > 0 and npgno != 0:
                    page = fp.getPage(npgno)
                    pos = (page.pos+4)*8
                    if payload_size > ovflSize:
                        nbytes = ovflSize
                        payload_size -= ovflSize
                    else:
                        nbytes = payload_size
                        payload_size = 0
                    buf.append(fp.read('bytes:%d' % nbytes, pos))
                    i+=1
                    fp.fp.pos = fp.pagesize*(npgno-1)*8
                    npgno = fp.get4byte() 
                if payload_size != 0:
                    raise Exception("database file is broken")
                return ''.join(buf)
            return fp.read('bytes:%d' % payload_size, self.pos+offset)
    def setcell(self, cursor):
        cursor.cell = self
def get_rootpageno(tname):
    rootpage, sql = TABLES[tname]
    return rootpage
def tables_add(row):
    if row[0] == 'table':
        TABLES[row[1]] = (row[3], row[4])
def printf(row):
    print row
def init_db(fname):
    iTab = 1
    fp = Pager(fname)
    cursor = Cursor(fp, iTab)
    for cell in cursor:
        tables_add([cell.getvalue(fp, i) for i in range(5)])
    return fp
def main(fname, tabname=None, *argv):
    fp = init_db(fname)
    if tabname is None:
        tabname = 'sqlite_master'
    iTab = get_rootpageno(tabname)
    cursor = Cursor(fp, iTab)
    cursor.moveToLeftMost()
    cell = cursor.cell
    if argv == ():
        indices = range(cell.nField)
    else:
        indices = []
        for idx in argv:
            indices.append(int(idx) % cell.nField)
        print tuple([cell.getvalue(fp, idx) for idx in indices])
    for cell in cursor:
        print tuple([cell.getvalue(fp, idx) for idx in indices])
class DB(object):
    def __init__(self, filename):
        self.pager = init_db(filename)
    def find(self, dic):
        cols = dic['cols']
        cursor = Cursor(self.pager, get_rootpageno(dic['from']))
        offset = dic.get('offset')
        if offset is not None:
            for i in range(offset):
                next(cursor)
        limit = dic.get('limit')
        n = 0
        for cell in cursor:
            if limit is not None and limit == n:
                raise StopIteration
            values = []
            for col in cols:
                values.append(cell.getvalue(self.pager, col))
            yield values
            n += 1
import sys
if __name__ == '__main__':
    argc = len(sys.argv)
    if argc < 2 :
        print "usage:%s <dabasefile> <table>? <cols>?" % sys.argv[0]
        sys.exit(1)
    main(*sys.argv[1:])

Recommended Posts

lecteur sqlite3
Excel-> pandas-> sqlite
Sqlite en Python
Aide-mémoire SQLite3