I tried to get data from AS / 400 quickly using pypyodbc Preparation 1

In the previous article, I tried to get data from AS / 400 quickly using pypyodbc, you can now connect to IBM i with ODBC. So, I came up with a lot of things I wanted to do, but when I thought about it, I needed to prepare it, so I created that area this time.

The reason why it is "No. 1" is that it is still in the middle of the road and only the input part can be processed. The output is difficult to verify without the actual machine, so this time it has passed.

Experiment environment

Helper module

I think I'll use it up later, so I'll pack various things into a file (module) called "sql_helper.py". It's a basic feature that you might find if you look for a package, but I tried to reinvent the wheel as a review of Python.

It looks like this for now.

sql_helper.py


def _load_lines(filename, encoding='utf-8'):
    return [ l for l in open(filename, 'r', -1, encoding) ]


def load_raw_string(filename, encoding='utf-8', rstrip=True, lstrip=False):
    def _echo(x): return x

    if   (    rstrip  and      lstrip): func = str.strip
    elif (not rstrip  and      lstrip): func = str.lstrip
    elif (    rstrip  and  not lstrip): func = str.rstrip    # default
    else:                               func = _echo         # dummy

    if  rstrip: term = "\n"  # cause rstrip(None) is remove "\n"!
    else:       term = ""

    s = _load_lines(filename, encoding)
    return term.join( list(map(lambda x: func(x), s)) )


def load_sql(filename, encoding='utf-8', lstrip=False):
    sts = []
    for st in strip_comment( load_raw_string(filename, encoding, True, lstrip) ).split(";\n"):
        if (st.isspace()  or  st == "" ): continue
        sts.append(st.strip("; \t\n") )
    return sts


def load_config(filename, encoding='utf-8'):
    cf = {}
    for line in strip_comment(load_raw_string(filename, encoding)).split("\n"):
        if ( line == ""  or  line.isspace() ): continue
        key_, val_ = line.split("=")
        cf[ key_.strip() ] = val_.strip()
    return cf

# ...Will continue a little longer...

I will explain briefly.

_load_lines()

Read the specified file. Simply make a list of 1 line, 1 element and you're done. I just don't want to write ```open (xxx, ...). Readline () `` `every time.

load_raw_string() The list obtained using `_load_line ()` above is read, multiplied by lstrip () and rstrip () for each line, and then reconstructed into a single character string and returned. Should I be able to choose whether to remove whitespace at the beginning of the line and whether to remove whitespace at the end of the line? After making an easy decision, the code became bigger. Before _load_lines () is the feature we have added for that. (It is also said that I want to end the return statement in one line)

func depends on the parameter str.lstrip、str.rstrip、str.strip、_echo(Just return the input value)Is being replaced.


 If you call str.rstrip and str.strip with no arguments, the newline at the end of the line will also disappear, so if you specify delete at the end of the line, put a newline symbol in `` `term```, otherwise a whitespace character. Is put in and reconstructed with a return statement.

 load_sql()
 Get the SQL from filename. If ";" (+ line feed) is included in this, divide it there and return it to the list of SQL statements, and you're done.

 It was good until I thought, but in order to realize this, I ended up making helpers for helpers.
 For example, it would be bad if it was cut off by "; \ n" in the comment, right? Or, it would be bad if it was cut off by the "; \ n" in the quotes, right? Or something.
 As a compound case, if it is ``` ...; / * block comment * /` `, it will not match"; \ n "if you have to remove the blank at the end of the line again after removing the comment! And so on....

 What I'm finally doing is like this
 1. Remove trailing whitespace using load_row_string ()
 2. Remove the comment with strip_comment () (described later), and then remove the white space at the end of the line as a result of removing the comment.
 3. At last, split into SQL single statement with "; \ n"
 4. Remove the ";" itself at the end of each sentence
 5. Repack 4. and return

 It might have been faster to make a simple SQL lexical analyzer normally.


 load_config()
 When you read a file in the following format ...


#### **`sample_connection.txt`**
```text

/* supports sql style comments */
  driver = {iSeries Access ODBC Driver}
    system = 127.0.0.1	
	uid    = USER01      
	pwd    = USER01   	

DefaultLibraries = MYLIB,TESTLIB1,TESTLIB2library

Convert it to a dictionary like this.

{'DefaultLibraries': 'MYLIB,TESTLIB1,TESTLIB2',
 'driver': '{iSeries Access ODBC Driver}',
 'pwd': 'USER01',
 'system': '127.0.0.1',
 'uid': 'USER01'}

If you pass this dictionary to the pypyodbc.connect () function, you don't have to write parameters in a row.

odbctest.py


from   pypyodbc import connect
import sql_helper as helper

config = helper.load_config( "sample_connection.txt" )

with connect( **config ) as connection:
    #Database access process...

strip_comment() As the source becomes longer, it becomes a comment removal function that has been used.

sql_helper.py


from  enum  import Enum, auto
class States(Enum):
    Statement = auto()
    Quoted    = auto()
    Comment   = auto()
    End       = auto()

_LimitLen = 999999999

def _singlequote_begin(s, b):
    p = s.find("'", b)
    return p   if (p >= 0) else _LimitLen

def _singlequote_end(s, b):
    p = s.find("'", b)
    if (p >= 0  and  p == s.find("''", b)):  p = _singlequote_end(s, p + 2)    # find recursive
    return (_next, p, 0, States.Statement)   if (p >= 0) else (None. _LimitLen, 0, States.End)

def _doublequote_begin(s, b):
    p = s.find('"', b)
    return p   if (p >= 0) else _LimitLen

def _doublequote_end(s, b):
    p = s.find('"', b)
    return (_next, p, 0, States.Statement)  if (p >= 0) else (None, _LimitLen, 0, States.End)

def _block_comment_begin(s, b):
    p = s.find("/*", b)
    return p + 1  if (p >= 0) else _LimitLen

def _block_comment_end (s, b):
    p = s.find("*/", b)
    return (_next, p + len("*/") - 1, len("*/") -1, States.Statement)  if (p >= 0) else (None, _LimitLen, 0, States.End)

def _line_comment_begin(s, b):
    p = s.find("--", b)
    return p + 1  if (p >= 0) else _LimitLen

def _line_comment_end(s, b):
    p =  s.find("\n", b)
    return (_next, p + len("\n") - 1, len("\n") - 1, States.Statement)   if (p >= 0) else (None, _LimitLen, 0, States.End)


def _quote_begin(s, b):
    next_state = States.Quoted
    sq, dq = _singlequote_begin(s, b), _doublequote_begin(s, b)
    if  (min(sq, dq) == _LimitLen): next_state = States.End
    return (_singlequote_end, sq, 0, next_state)  if (sq < dq) else (_doublequote_end, dq, 0, next_state)

def _comment_begin(s, b):
    bc, lc = _block_comment_begin(s, b), _line_comment_begin(s, b)
    if  (min(bc, lc) == _LimitLen): next_ = States.End
    return (_line_comment_end, lc, 0, States.Comment)    if (lc < bc) else  (_block_comment_end, bc, 0, States.Comment)

def _next(s, b):
    q_func, q_pos, q_ad, q_st = _quote_begin(s, b)
    c_func, c_pos, c_ad, c_st = _comment_begin(s, b)
    return  (q_func, q_pos, 0, q_st)  if (q_pos < c_pos) else (c_func, c_pos, 0, c_st)


def strip_comment( st ):
    #Short evaluation
    if st == None  or  len( st.strip() ) == 0: return ""
    if ("/*" not in st)  and  ("--" not in st): return  "\n".join( list ( map(lambda x: x.rstrip(), st.split("\n"))))

    chars = list( st )

    comments = _find_comment_pos( st )
    comments.reverse()
    for c in comments:  del chars[ c[0]: c[1]]

    return "\n".join( list( map(lambda x: x.rstrip() , "".join( chars ).split("\n")) ) )


def _find_comment_pos( st ):
    cur   = -1
    begin =  0
    comments = []

    state = States.Statement
    pstate = None
    func = _next

    while (True):
        func, pos, adv, state = func(st, cur + 1)

        #Search end
        if  ( state == States.End):
            if  (pstate == States.Comment):
                comments.append((begin, len(st)))
            break

        cur = pos

        #Start commenting->Skip subsequent processing
        if  (state == States.Quoted):
            pstate = state
            continue

        # end comment
        if  (pstate == States.Comment):
            comments.append((begin, pos + adv))
            pstate = state
            continue

        # begin comment
        if  (state == States.Comment):
            begin = pos - 1  # previous a length of single/double quotation

        pstate = state

    return comments

I don't remember what I wrote anymore, so I'll explain it briefly.

Overview

Strip_comment () and the function that searches the comment position and returns a list of (start position, end position) _find_comment_pos () The main functions are the functions and objects that are lined up in a row before that. This is the helper used by _find_comment_pos (). The miscellaneous functions were originally defined as internal functions of _find_comment (), but now func, pos, adv, state = func(st, cur + 1) When you run, the first func(Returns the next function to search)However, it seems that it is a tuple of 3 items instead of a normal function object. It seems to be difficult to find out how to deal with this, so I am obediently changing to a normal function. (Because of that, I decided to scatter extra functions in the module ...)

___find_comment___pos() Local variables'''func```Holds the function to be used for the next search. Current status and hit characters("/**"And"'"And改行And)It can be realized by dividing the case with, but since conditional branching seems to be terrible, I am trying to return an appropriate search function according to the hit character. Example) "/"If you find, the next thing to look for"/"Since it is decided, the function to search for it_block_comment_end()Is decided

Is it enough to return the hit position together at this time? I thought, but after all I want this information, I also need this, I have to return 4 values from each function...。 After that, I adjusted the numerical value so that the comment position can be specified by the slice, and packed it in the list. (I forgot the details..)

strip_comment()

Now that the comment position has been decided, all you have to do is remove the comment using a slice! !!

The string class has no method to remove the character at the slice position! !!

It can't be helpedchars = list( st )Convert to a list of characters withdel chars[begin:end]By doing so, I finally succeeded in removing the comment, not the long-awaited ice saw d.

The last return statement, but a blank line at the end of the comment removal("Text; /comment/\n"pattern)I decided to write a roundabout process again in order to erase it further.

#Finally The file used for the test and the test code.(sample_connection.txt)Is omitted because it has already been posted)

sample_multi_statement.sql


select * from SYSLIBL 
  where TYPE not like = "';%"   ;/* comment */

select * from SYSTABLES  ;   /* "" ' '' ; ;
" */

  ;  -- empty statement

select * from SYSCOLUMNS        ;
select * from SYSIBM.SYSDUMMY1;

strip_For short-circuit evaluation confirmation of comment

sample_multi_statement.sql


select * from SYSLIBL 
  where TYPE not like = "';%"   ;

select * from SYSTABLES  ;   

;

select * from SYSCOLUMNS        ;	
select * from SYSIBM.SYSDUMMY1;

sql_helper.py


def test_loading():
    from pprint import pprint

    file = "sample_multi_statement.sql"
    print( f'\ntest load_sql("{file}") ----------------------------')
    sts = load_sql( file )
    pprint( sts )

    file = "sample_multi_statement2.sql"
    print( f'\ntest load_sql("{file}") ----------------------------')
    sts = load_sql( file )
    pprint( sts )

    file = "sample_config.txt"
    print( f'\ntest load_config("{file}") ----------------------------')
    config = load_config( file )
    pprint( config )

The amount of code was larger than expected just by reading. I started creating it on the assumption that it will be used on IBM i, but I think that the contents so far can be used as preprocessing for other DBMSs.

Next time, we will start on the output side.

Recommended Posts

I tried to get data from AS / 400 quickly using pypyodbc Preparation 1
I tried to get data from AS / 400 quickly using pypyodbc
[Python] I tried to get various information using YouTube Data API!
I tried to get CloudWatch data with Python
I tried to get an AMI using AWS Lambda
[Python] I tried to get the type name as a string from the type function
I tried reading data from a file using Node.js.
I tried to make a suspicious person MAP quickly using Geolonia address data
I want to get custom data attributes of html as elements using Python Selenium
I tried to operate from Postman using Cisco Guest Shell as an API server
I tried to search videos using Youtube Data API (beginner)
I tried to get Web information using "Requests" and "lxml"
I tried to get various information from the codeforces API
Get data from Twitter using Tweepy
I tried to get a database of horse racing using Pandas
I tried to get the index of the list using the enumerate function
I tried to get a list of AMI Names using Boto3
I tried to visualize BigQuery data using Jupyter Lab on GCP
I tried using Azure Speech to Text.
I tried to create a linebot (preparation)
I tried to get started with Hy
I tried using YOUTUBE Data API V3
I tried to classify text using TensorFlow
I tried using Selective search as R-CNN
I tried using UnityCloudBuild API from Python
I tried using Headless Chrome from Selenium
I tried to predict Covid-19 using Darts
I tried to execute SQL from the local environment using Looker SDK
[Data science basics] I tried saving from csv to mysql with python
I tried to get the batting results of Hachinai using image processing
I tried to perform a cluster analysis of customers using purchasing data
Python programming: I tried to get company information (crawling) from Yahoo Finance in the US using BeautifulSoup4
I tried to get an image by scraping
I tried to save the data with discord
I want to email from Gmail using Python.
I tried to detect motion quickly with OpenCV
I tried to synthesize WAV files using Pydub.
How to get article data using Qiita API
I want to get League of Legends data ③
I want to get League of Legends data ②
I want to get League of Legends data ①
I tried DBM with Pylearn 2 using artificial data
I tried to make a ○ ✕ game using TensorFlow
I tried to explain multiple regression analysis as easily as possible using concrete examples.
Python programming: I tried to get (crawling) news articles using Selenium and BeautifulSoup4.
I tried to execute Python code from .Net using Pythonnet (Hallo World edition)
I tried to get the information of the .aspx site that is paging using Selenium IDE as non-programming as possible.
I tried using parameterized
I tried using mimesis
I tried using anytree
I tried using aiomysql
I tried to get started with blender python script_Part 01
I tried using Summpy
I tried to predict the J-League match (data analysis)
I tried to predict horse racing by doing everything from data collection to deep learning
I tried using coturn
I tried using Pipenv
I tried using matplotlib
I tried using "Anvil".
I tried using Hubot
I tried using ESPCN