sqlite3 reader

I used to make a python version of sqlite3. I used to call it pysql, but for some reason I deleted the latest version. However, the very early ones remained in the blog, so I will post them here. The only function at this time is the function to read the db file. The version of sqlite3 used to create the db file to read is:

$ sqlite3 --version
3.7.9 2011-11-01 00:52:41 c7c6050ef060877ebe77b41d959e9df13f8c9b5e

Write a fruits table similar to the following to the test.db file.

$ sqlite3 test.db
SQLite version 3.7.9 2011-11-01 00:52:41
Enter ".help" for instructions
Enter SQL statements terminated with a ";"
sqlite> create table fruits(name string, value integer);
sqlite> insert into fruits values('apple', 100);
sqlite> insert into fruits values('orange', 130);
sqlite> .q

Let's read this using pysql.

$ ./pysql.py test.db fruits 0
('apple',)
('orange',)
$ ./pysql.py test.db fruits 1
(100,)
(130,)
$ ./pysql.py test.db fruits 
('orange', 130)
$ ./pysql.py test.db fruits 0 1
('apple', 100)
('orange', 130)
$ ./pysql.py test.db fruits 1 0
(100, 'apple')
(130, 'orange')

I managed to move. It seems that the program will stop in one shot if you make a mistake in the table name etc., but the source code is as follows.

pysql.py


#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
import bitstring
HEADER_OFFSET_PAGE1 = 100
#page type
INTKEY = 0x01
ZERO_DATA = 0x02
LEAF_DATA = 0x04
LEAF = 0x08
TABLES = {'sqlite_master':(1, 
"""CREATE sqlite_master(
                type text, 
                name text, 
                tbl_name text, 
                rootpage integer, 
                sql text)'
""")}
SIZE = [0,1,2,3,4,6,8,8,0,0,0,0]
def get_fieldsize(serial_type):
    if serial_type >= 12:
        return (serial_type-12)/2
    else:
        return SIZE[serial_type];
class Pager(object):
    def __init__(self, fname):
        self.fp = bitstring.ConstBitStream(filename=fname)
        self.pagesize = self.get_pagesize()
        self.pages = {}
        self.fp.pos = 20*8

        nReserve = self.fp.read('uint:8')
        self.usableSize = self.pagesize - nReserve
        self.maxLeaf = self.usableSize - 35
        self.minLeaf = (self.usableSize - 12) * 32/255 - 23
        self.maxLocal = (self.usableSize - 12) * 64/255 - 23
        self.minLocal = self.minLeaf
    def read(self, type_fmt, pos):
        self.fp.pos = pos
        return self.fp.read(type_fmt)
    def getPage(self, iTab):
        page = self.pages.get(iTab)
        if page is None:
            page = Page(self, iTab)
        return page
# primitive
    def get2byte(self):
       return self.fp.read('uint:8') << 8 | self.fp.read('uint:8')
    def get4byte(self):
        return self.fp.read('uint:8') << 24 | self.fp.read('uint:8') << 16 |\
        self.fp.read('uint:8') << 8 | self.fp.read('uint:8')
    def get_pagesize(self):
        self.fp.pos = 16*8
        return self.fp.read('uint:8') << 8 | self.fp.read('uint:8') << 16
    def getVarint(self):
        p = []
        p.append(self.fp.read('uint:8'))
        if not (p[0] & 0x80):
            return p[0], 1
        p.append(self.fp.read('uint:8'))
        if not (p[1] & 0x80):
            v = p[0] & 0x7f
            v <<= 7
            v |= p[1]
            return v, 2
        p.append(self.fp.read('uint:8'))
        if not (p[2] & 0x80):
            v = p[0] & 0x7f
            v <<= 7
            v |= p[1] & 0x7f
            v <<= 7
            v |= p[2] & 0x7f
            return v, 3
        raise Exception('too long')
    def set_cellsize(self, page):
        self.fp.pos = (page.pos + page.hdroffset + 3)*8
        page.nCell = self.get2byte()
    def get_pagetype(self, page):
        self.fp.pos = (page.hdroffset + self.pagesize * (page.pageno-1)) * 8
        return self.fp.read('uint:8')
    def find_cell_offset(self, iCell, page):
        mask = self.pagesize - 0x01
        celloffset = page.pos + page.hdroffset + 8 + page.childPtrSize
        if iCell == page.nCell:
            self.fp.pos = (celloffset-4)*8
            return self.fp.pos
        self.fp.pos = (celloffset + iCell*2)*8
        self.fp.pos = (page.pos + (mask & self.get2byte()))*8
        return self.fp.pos
MAX_DEPTH = 20
class Cursor(object):
    def __init__(self, fp, pgno):
        self.fp = fp
        self.pgno = pgno
        self.cell = None
        self.pages = [None]*MAX_DEPTH
        self.iCells = [None]*MAX_DEPTH
        self.depth = -1
    def moveToLeftMost(self):
        page = self.fp.getPage(self.pgno)
        self.depth += 1
        self.pages[self.depth] = page
        self.iCells[self.depth] = 0
        while not page.leaf:
            self.depth += 1  
            self.iCells[self.depth] = 0
            page = page.find_entry(self.fp, 0)
            self.pages[self.depth] = page
        assert(page.leaf)
        if page.nCell == 0:# for empty table
            raise StopIteration
        self.cell = page.find_entry(self.fp, self.iCells[self.depth])
    def moveNextLeaf(self):
        page = self.pages[self.depth]
        self.iCells[self.depth] += 1
        iCell = self.iCells[self.depth]
        if iCell > page.nCell - page.leaf:
            if self.depth == 0:
                raise StopIteration 
            self.depth -= 1
            self.pgno = page.pageno
            return self.moveNextLeaf()
        else:
            entry = page.find_entry(self.fp, iCell)
            entry.setcell(self)
            return self.cell
    def next(self):
        if self.cell is None:
            self.moveToLeftMost()
            return self.cell
        else:
            return self.moveNextLeaf() 
    def __iter__(self):
        return self
    def moveTo(self, iCell, pgno=None):
        if pgno is None:
            page = self.pages[self.depth]
        else:
            page = self.fp.getPage(pgno)
            self.pages[self.depth] = page
        assert(page.leaf)
        self.iCells[self.depth] = iCell
        self.cell = page.find_entry(self.fp, iCell)
class Page(object):
    def __init__(self, pager, pageno):
        if pageno == 1:
            self.hdroffset = HEADER_OFFSET_PAGE1
        else:
            self.hdroffset = 0
        self.pageno = pageno
        leaf = False
        childPtrSize = 4
        if LEAF & pager.get_pagetype(self):
            leaf = True
            childPtrSize = 0 
        self.leaf = leaf
        self.childPtrSize = childPtrSize

        self.pos = pager.pagesize*(pageno-1)
        self.maxLocal = pager.maxLeaf
        self.minLocal = pager.minLeaf
        self.nCell = None
        self.nField = None
        pager.set_cellsize(self)
        pager.pages[pageno] = self 
    def find_entry(self, fp, iCell):
        pos = fp.find_cell_offset(iCell, self)
        if not self.leaf:
            pgno = fp.get4byte() 
            return fp.getPage(pgno) 
        n = 0
        nPayload, tn =  fp.getVarint()
        n += tn
        intKey, tn = fp.getVarint()
        n += tn
        cell_hdr_offset = n
        keyoff, tn = fp.getVarint()
        n = tn
        stypes = []
        while n < keyoff:
            serial_type, tn = fp.getVarint()
            n += tn
            stypes.append(serial_type)
        if nPayload <= self.maxLocal:
            nLocal = nPayload
        else:
            minLocal = self.minLocal
            maxLocal = self.maxLocal
            surplus = minLocal + (nPayload - minLocal) % (fp.usableSize - 4)
            if surplus <= maxLocal:
                nLocal = surplus
            else:
                nLocal = minLocal
        return Cell(self, pos, nPayload, intKey, cell_hdr_offset, keyoff, stypes, nLocal)
    def setcell(self, cursor):
        cursor.pgno = self.pageno
        cursor.moveToLeftMost()
class Cell(object):
    def __init__(self, page, pos, nPayload, rowid, hdr_size, keyoffset, stypes, nLocal):
        self.parent = page
        offset = (hdr_size + keyoffset)*8
        self.pos = pos
        self.hdr = hdr_size*8
        self.nPayload = nPayload
        self.rowid = rowid
        self.stypes = stypes
        self.nLocal = nLocal

        self.nField = len(stypes)
        self.offsets = [offset]
        for serial_type in stypes:
            offset += get_fieldsize(serial_type)*8
            self.offsets.append(offset)
    def getvalue(self, fp, iField):
        serial_type = self.stypes[iField]
        offset = self.offsets[iField]
        payload_size = get_fieldsize(serial_type)
        if serial_type == 0 or serial_type == 10 or serial_type == 11:
            return None
        elif 1 <= serial_type and serial_type <= 6:
            return fp.read('int:%d' % (payload_size*8), self.pos + offset)
        else:
            page = self.parent
            if payload_size > self.nLocal:
                ovflSize = fp.usableSize - 4
                keyoffset = (offset - self.hdr)/8
                size = self.nLocal - keyoffset
                fp.fp.pos = self.pos + self.hdr + self.nLocal * 8
                npgno = fp.get4byte()
                buf = [fp.read('bytes:%d' % size, self.pos+offset)]
                nOverflow = (payload_size-self.nLocal+ovflSize-1)/ovflSize
                payload_size -= size
                i = keyoffset/ovflSize
                while payload_size > 0 and npgno != 0:
                    page = fp.getPage(npgno)
                    pos = (page.pos+4)*8
                    if payload_size > ovflSize:
                        nbytes = ovflSize
                        payload_size -= ovflSize
                    else:
                        nbytes = payload_size
                        payload_size = 0
                    buf.append(fp.read('bytes:%d' % nbytes, pos))
                    i+=1
                    fp.fp.pos = fp.pagesize*(npgno-1)*8
                    npgno = fp.get4byte() 
                if payload_size != 0:
                    raise Exception("database file is broken")
                return ''.join(buf)
            return fp.read('bytes:%d' % payload_size, self.pos+offset)
    def setcell(self, cursor):
        cursor.cell = self
def get_rootpageno(tname):
    rootpage, sql = TABLES[tname]
    return rootpage
def tables_add(row):
    if row[0] == 'table':
        TABLES[row[1]] = (row[3], row[4])
def printf(row):
    print row
def init_db(fname):
    iTab = 1
    fp = Pager(fname)
    cursor = Cursor(fp, iTab)
    for cell in cursor:
        tables_add([cell.getvalue(fp, i) for i in range(5)])
    return fp
def main(fname, tabname=None, *argv):
    fp = init_db(fname)
    if tabname is None:
        tabname = 'sqlite_master'
    iTab = get_rootpageno(tabname)
    cursor = Cursor(fp, iTab)
    cursor.moveToLeftMost()
    cell = cursor.cell
    if argv == ():
        indices = range(cell.nField)
    else:
        indices = []
        for idx in argv:
            indices.append(int(idx) % cell.nField)
        print tuple([cell.getvalue(fp, idx) for idx in indices])
    for cell in cursor:
        print tuple([cell.getvalue(fp, idx) for idx in indices])
class DB(object):
    def __init__(self, filename):
        self.pager = init_db(filename)
    def find(self, dic):
        cols = dic['cols']
        cursor = Cursor(self.pager, get_rootpageno(dic['from']))
        offset = dic.get('offset')
        if offset is not None:
            for i in range(offset):
                next(cursor)
        limit = dic.get('limit')
        n = 0
        for cell in cursor:
            if limit is not None and limit == n:
                raise StopIteration
            values = []
            for col in cols:
                values.append(cell.getvalue(self.pager, col))
            yield values
            n += 1
import sys
if __name__ == '__main__':
    argc = len(sys.argv)
    if argc < 2 :
        print "usage:%s <dabasefile> <table>? <cols>?" % sys.argv[0]
        sys.exit(1)
    main(*sys.argv[1:])

Recommended Posts

sqlite3 reader
Excel-> pandas-> sqlite
Sqlite in python
SQLite3 cheat sheet