This is a SQLAlchemy memo that allows you to write python-like code around sql. By introducing such a library, you can create a service with few bugs by eliminating SQL syntax mistakes.
The explanation below is based on version 1.1.
SQLAlchemy makes it possible to handle column information programmatically by defining a table in the code. You can also shift the responsibility of creating the table from manual to code.
http://docs.sqlalchemy.org/en/rel_1_1/core/metadata.html
import sqlalchemy as sa
user = sa.Table('user', metadata,
sa.Column('user_id', sa.Integer, primary_key=True),
sa.Column('user_name', sa.String(16), nullable=False),
sa.Column('email_address', sa.String(60)),
sa.Column('password', sa.String(20), nullable=False)
)
As you can see in AUTO_INCREMENT Behavior, autoincrement is automatically added to the first column that meets the following conditions: Will be done.
--primary key --Integer --Not a foreign key
Use sqlalchemy.dialects.mysql.INTEGER. dialect: It's a dialect. mysql dialect. If you want to use various mysql, it may be better to use the type under dialects.mysql.
import sqlalchemy as sa
from sqlalchemy.dialects.mysql import INTEGER
...
sa.Column('scenario_id', INTEGER(unsigned=True), nullable=False)
Others are as in MySQL Data Types
from sqlalchemy.dialects.mysql import \
BIGINT, BINARY, BIT, BLOB, BOOLEAN, CHAR, DATE, \
DATETIME, DECIMAL, DECIMAL, DOUBLE, ENUM, FLOAT, INTEGER, \
LONGBLOB, LONGTEXT, MEDIUMBLOB, MEDIUMINT, MEDIUMTEXT, NCHAR, \
NUMERIC, NVARCHAR, REAL, SET, SMALLINT, TEXT, TIME, TIMESTAMP, \
TINYBLOB, TINYINT, TINYTEXT, VARBINARY, VARCHAR, YEAR
And so on.
There are two ways to specify default
and server_default
.
--default: Give default value if no value is specified in python layer --Does not affect the create statement (probably) --server_default: The so-called SQL default statement. --Create a default statement in create table.
Here, we will explain server_default.
sa.Column('x', sa.Text, server_default="val")
↓
x TEXT DEFAULT 'val'
sa.Column('y', sa.DateTime, server_default=sa.text('NOW()'))
↓
y DATETIME DEFAULT NOW()
The value given as in the example above is enclosed in quotation marks. If you don't want to quote it, use text.
In the usual case of creating_datetime, the default is as follows.
import sqlalchemy as sa
from sqlalchemy.dialects.mysql import DATETIME
sa.Column('create_datetime', DATETIME(), nullable=False, server_default=sa.text('CURRENT_TIMESTAMP')),
Like default, there are onupdate and server_onupdate. The difference is the same as default, use server_onupdate to change the create table statement.
A common timestamp (update time) can be written as:
import sqlalchemy as sa
from sqlalchemy.dialects.mysql import DATETIME
sa.Column('timestamp', DATETIME(), nullable=False,
server_default=sa.text('CURRENT_TIMESTAMP'), server_onupdate=sa.text('CURRENT_TIMESTAMP'))
table = sa.Table('user', metadata,
sa.Column('user_id', sa.Integer, primary_key=True),
sa.Column('user_name', sa.String(16), nullable=False),
sa.Column('email_address', sa.String(60)),
sa.Column('password', sa.String(20), nullable=False)
)
Suppose a variable called table
is defined as above.
We're just making a query here, so it won't work unless we actually put it into the DB engine. Please catch other documents for how to put it in. (I will not write it here because it seems that it is different from the general usage because I am using aiomysql)
select
import sqlalchemy as sa
q = sa.select(['user_id', 'user_name']).where(table.c.user_id == 1234)
# or
q = table.select().where(table.c.user_id == 1234)
You can specify the condition in the list of columns you want to get in select (), followed by where. table.c. represents a column. This means that the value in the user_id column is 1234.
It seems that the column name cannot be specified in table.select ().
Read ↓ for details. http://docs.sqlalchemy.org/en/rel_1_1/core/selectable.html
By the way, if you want to use the assigned parameters in the test, you can do as follows.
print(str(q))
# SELECT user.user_id, user.user_name
# FROM user
# WHERE user.user_id = :user_id_1
print(q.compile().params)
# {'user_id_1': 1234}
insert
q = table.insert().values(
# user_id is auto increment
user_name='hal',
#Since email is not specified, it will be null
password='greatpassword'
)
You can create a query with values specified by insert (). values (). it's easy.
If you want to use the assigned parameters etc. in the test, you can do as follows.
print(str(q))
# INSERT INTO user (user_name, password) VALUES (:user_name, :password)
print(q.compile().params)
# {'user_name': 'hal', 'password': 'greatpassword'}
Here's how to use aiomysql to execute the query created above. ** It is common to build an engine among the functions of sqlalchemy **, so if you want to use it normally, read here. I think it's good.
Since we are using aiomysql with asyncio, which is a new specification that does asynchronous processing well, I will briefly touch on how to use it in that case. The official documentation is http://aiomysql.readthedocs.io/en/latest/sa.html.
It is convenient to make something wrapped in the context manager as shown below.
import asyncio
from aiomysql.sa import create_engine
class MyDB:
async def __aenter__(self):
loop = asyncio.get_event_loop()
config = self._load_db_config()
engine = await create_engine( #Make an engine with aiomysql
host=config['host'], #The argument can be the one of aiomysql connect
port=config['port'],
user=config['user'],
password=config['password'],
db=config['database'],
charset='utf8',
autocommit=True, #If this is set to True, it will be reflected immediately when the insert command is executed.
loop=loop
)
self._connection = await engine.acquire()
return self
async def __aexit__(self, exc_type, exc, tb):
self._connection.close()
async def execute(self, query, *multiparams, **params):
return await self._connection.execute(query, *multiparams, **params)
User side
table = sa.Table('user', metadata,
sa.Column('user_id', sa.Integer, primary_key=True),
sa.Column('user_name', sa.String(16), nullable=False),
sa.Column('email_address', sa.String(60)),
sa.Column('password', sa.String(20), nullable=False)
)
async with MyDB() as db:
q = table.select(['user_id', 'user_name']).where(table.c.user_id == 1234)
row_list = db.execute(q).fetchall()
for row in row_list:
print(row[table.c.user_name])
Recommended Posts