In the previous article, I tried to get data from AS / 400 quickly using pypyodbc, you can now connect to IBM i with ODBC. So, I came up with a lot of things I wanted to do, but when I thought about it, I needed to prepare it, so I created that area this time.
The reason why it is "No. 1" is that it is still in the middle of the road and only the input part can be processed. The output is difficult to verify without the actual machine, so this time it has passed.
I think I'll use it up later, so I'll pack various things into a file (module) called "sql_helper.py". It's a basic feature that you might find if you look for a package, but I tried to reinvent the wheel as a review of Python.
It looks like this for now.
sql_helper.py
def _load_lines(filename, encoding='utf-8'):
return [ l for l in open(filename, 'r', -1, encoding) ]
def load_raw_string(filename, encoding='utf-8', rstrip=True, lstrip=False):
def _echo(x): return x
if ( rstrip and lstrip): func = str.strip
elif (not rstrip and lstrip): func = str.lstrip
elif ( rstrip and not lstrip): func = str.rstrip # default
else: func = _echo # dummy
if rstrip: term = "\n" # cause rstrip(None) is remove "\n"!
else: term = ""
s = _load_lines(filename, encoding)
return term.join( list(map(lambda x: func(x), s)) )
def load_sql(filename, encoding='utf-8', lstrip=False):
sts = []
for st in strip_comment( load_raw_string(filename, encoding, True, lstrip) ).split(";\n"):
if (st.isspace() or st == "" ): continue
sts.append(st.strip("; \t\n") )
return sts
def load_config(filename, encoding='utf-8'):
cf = {}
for line in strip_comment(load_raw_string(filename, encoding)).split("\n"):
if ( line == "" or line.isspace() ): continue
key_, val_ = line.split("=")
cf[ key_.strip() ] = val_.strip()
return cf
# ...Will continue a little longer...
I will explain briefly.
_load_lines()
Read the specified file. Simply make a list of 1 line, 1 element and you're done. I just don't want to write ```open (xxx, ...). Readline () `` `every time.
load_raw_string()
The list obtained using `_load_line ()`
above is read, multiplied by lstrip () and rstrip () for each line, and then reconstructed into a single character string and returned.
Should I be able to choose whether to remove whitespace at the beginning of the line and whether to remove whitespace at the end of the line? After making an easy decision, the code became bigger. Before _load_lines () is the feature we have added for that. (It is also said that I want to end the return statement in one line)
func depends on the parameter str.lstrip、str.rstrip、str.strip、_echo(Just return the input value)Is being replaced.
If you call str.rstrip and str.strip with no arguments, the newline at the end of the line will also disappear, so if you specify delete at the end of the line, put a newline symbol in `` `term```, otherwise a whitespace character. Is put in and reconstructed with a return statement.
load_sql()
Get the SQL from filename. If ";" (+ line feed) is included in this, divide it there and return it to the list of SQL statements, and you're done.
It was good until I thought, but in order to realize this, I ended up making helpers for helpers.
For example, it would be bad if it was cut off by "; \ n" in the comment, right? Or, it would be bad if it was cut off by the "; \ n" in the quotes, right? Or something.
As a compound case, if it is ``` ...; / * block comment * /` `, it will not match"; \ n "if you have to remove the blank at the end of the line again after removing the comment! And so on....
What I'm finally doing is like this
1. Remove trailing whitespace using load_row_string ()
2. Remove the comment with strip_comment () (described later), and then remove the white space at the end of the line as a result of removing the comment.
3. At last, split into SQL single statement with "; \ n"
4. Remove the ";" itself at the end of each sentence
5. Repack 4. and return
It might have been faster to make a simple SQL lexical analyzer normally.
load_config()
When you read a file in the following format ...
#### **`sample_connection.txt`**
```text
/* supports sql style comments */
driver = {iSeries Access ODBC Driver}
system = 127.0.0.1
uid = USER01
pwd = USER01
DefaultLibraries = MYLIB,TESTLIB1,TESTLIB2library
Convert it to a dictionary like this.
{'DefaultLibraries': 'MYLIB,TESTLIB1,TESTLIB2',
'driver': '{iSeries Access ODBC Driver}',
'pwd': 'USER01',
'system': '127.0.0.1',
'uid': 'USER01'}
If you pass this dictionary to the pypyodbc.connect () function, you don't have to write parameters in a row.
odbctest.py
from pypyodbc import connect
import sql_helper as helper
config = helper.load_config( "sample_connection.txt" )
with connect( **config ) as connection:
#Database access process...
strip_comment() As the source becomes longer, it becomes a comment removal function that has been used.
sql_helper.py
from enum import Enum, auto
class States(Enum):
Statement = auto()
Quoted = auto()
Comment = auto()
End = auto()
_LimitLen = 999999999
def _singlequote_begin(s, b):
p = s.find("'", b)
return p if (p >= 0) else _LimitLen
def _singlequote_end(s, b):
p = s.find("'", b)
if (p >= 0 and p == s.find("''", b)): p = _singlequote_end(s, p + 2) # find recursive
return (_next, p, 0, States.Statement) if (p >= 0) else (None. _LimitLen, 0, States.End)
def _doublequote_begin(s, b):
p = s.find('"', b)
return p if (p >= 0) else _LimitLen
def _doublequote_end(s, b):
p = s.find('"', b)
return (_next, p, 0, States.Statement) if (p >= 0) else (None, _LimitLen, 0, States.End)
def _block_comment_begin(s, b):
p = s.find("/*", b)
return p + 1 if (p >= 0) else _LimitLen
def _block_comment_end (s, b):
p = s.find("*/", b)
return (_next, p + len("*/") - 1, len("*/") -1, States.Statement) if (p >= 0) else (None, _LimitLen, 0, States.End)
def _line_comment_begin(s, b):
p = s.find("--", b)
return p + 1 if (p >= 0) else _LimitLen
def _line_comment_end(s, b):
p = s.find("\n", b)
return (_next, p + len("\n") - 1, len("\n") - 1, States.Statement) if (p >= 0) else (None, _LimitLen, 0, States.End)
def _quote_begin(s, b):
next_state = States.Quoted
sq, dq = _singlequote_begin(s, b), _doublequote_begin(s, b)
if (min(sq, dq) == _LimitLen): next_state = States.End
return (_singlequote_end, sq, 0, next_state) if (sq < dq) else (_doublequote_end, dq, 0, next_state)
def _comment_begin(s, b):
bc, lc = _block_comment_begin(s, b), _line_comment_begin(s, b)
if (min(bc, lc) == _LimitLen): next_ = States.End
return (_line_comment_end, lc, 0, States.Comment) if (lc < bc) else (_block_comment_end, bc, 0, States.Comment)
def _next(s, b):
q_func, q_pos, q_ad, q_st = _quote_begin(s, b)
c_func, c_pos, c_ad, c_st = _comment_begin(s, b)
return (q_func, q_pos, 0, q_st) if (q_pos < c_pos) else (c_func, c_pos, 0, c_st)
def strip_comment( st ):
#Short evaluation
if st == None or len( st.strip() ) == 0: return ""
if ("/*" not in st) and ("--" not in st): return "\n".join( list ( map(lambda x: x.rstrip(), st.split("\n"))))
chars = list( st )
comments = _find_comment_pos( st )
comments.reverse()
for c in comments: del chars[ c[0]: c[1]]
return "\n".join( list( map(lambda x: x.rstrip() , "".join( chars ).split("\n")) ) )
def _find_comment_pos( st ):
cur = -1
begin = 0
comments = []
state = States.Statement
pstate = None
func = _next
while (True):
func, pos, adv, state = func(st, cur + 1)
#Search end
if ( state == States.End):
if (pstate == States.Comment):
comments.append((begin, len(st)))
break
cur = pos
#Start commenting->Skip subsequent processing
if (state == States.Quoted):
pstate = state
continue
# end comment
if (pstate == States.Comment):
comments.append((begin, pos + adv))
pstate = state
continue
# begin comment
if (state == States.Comment):
begin = pos - 1 # previous a length of single/double quotation
pstate = state
return comments
I don't remember what I wrote anymore, so I'll explain it briefly.
Strip_comment () and the function that searches the comment position and returns a list of (start position, end position) _find_comment_pos () The main functions are the functions and objects that are lined up in a row before that. This is the helper used by _find_comment_pos ().
The miscellaneous functions were originally defined as internal functions of _find_comment (), but now
func, pos, adv, state = func(st, cur + 1)
When you run, the first func(Returns the next function to search)However, it seems that it is a tuple of 3 items instead of a normal function object.
It seems to be difficult to find out how to deal with this, so I am obediently changing to a normal function. (Because of that, I decided to scatter extra functions in the module ...)
___find_comment___pos() Local variables'''func```Holds the function to be used for the next search. Current status and hit characters("/**"And"'"And改行And)It can be realized by dividing the case with, but since conditional branching seems to be terrible, I am trying to return an appropriate search function according to the hit character. Example) "/"If you find, the next thing to look for"/"Since it is decided, the function to search for it_block_comment_end()Is decided
Is it enough to return the hit position together at this time? I thought, but after all I want this information, I also need this, I have to return 4 values from each function...。 After that, I adjusted the numerical value so that the comment position can be specified by the slice, and packed it in the list. (I forgot the details..)
Now that the comment position has been decided, all you have to do is remove the comment using a slice! !!
The string class has no method to remove the character at the slice position! !!
It can't be helpedchars = list( st )
Convert to a list of characters withdel chars[begin:end]
By doing so, I finally succeeded in removing the comment, not the long-awaited ice saw d.
The last return statement, but a blank line at the end of the comment removal("Text; /comment/\n"pattern)I decided to write a roundabout process again in order to erase it further.
#Finally The file used for the test and the test code.(sample_connection.txt)Is omitted because it has already been posted)
sample_multi_statement.sql
select * from SYSLIBL
where TYPE not like = "';%" ;/* comment */
select * from SYSTABLES ; /* "" ' '' ; ;
" */
; -- empty statement
select * from SYSCOLUMNS ;
select * from SYSIBM.SYSDUMMY1;
strip_For short-circuit evaluation confirmation of comment
sample_multi_statement.sql
select * from SYSLIBL
where TYPE not like = "';%" ;
select * from SYSTABLES ;
;
select * from SYSCOLUMNS ;
select * from SYSIBM.SYSDUMMY1;
sql_helper.py
def test_loading():
from pprint import pprint
file = "sample_multi_statement.sql"
print( f'\ntest load_sql("{file}") ----------------------------')
sts = load_sql( file )
pprint( sts )
file = "sample_multi_statement2.sql"
print( f'\ntest load_sql("{file}") ----------------------------')
sts = load_sql( file )
pprint( sts )
file = "sample_config.txt"
print( f'\ntest load_config("{file}") ----------------------------')
config = load_config( file )
pprint( config )
The amount of code was larger than expected just by reading. I started creating it on the assumption that it will be used on IBM i, but I think that the contents so far can be used as preprocessing for other DBMSs.
Next time, we will start on the output side.
Recommended Posts