MySQL has aes_encrypt and aes_decrypt functions to encrypt columns. There are many examples of using these functions in SQL statements, but few examples of using sqlalchemy. This article gives an example of referencing an encrypted column using sqlalchemy.
When using MS Windows, it was possible to handle data encrypted with VARBINARY as follows. However, in the case of Ubuntu, an error occurred, so as explained later, the encrypted column is also VARCHAR type and hex (aes_encrypt ()) is used.
Name: server_login_info
No. | Column name | Mold | PK | NN | UQ | AI | comment |
---|---|---|---|---|---|---|---|
1. | id | INT(11) | ☑ | ☑ | ☑ | Primary key | |
2. | name | VARCHAR(20) | ☑ | ☑ | |||
3. | user | VARBINARY(25) | Login username (encrypted) | ||||
4. | password | VARBINARY(30) | Login password (encryption) |
Add sample data to the table
SET @key_str =unhex(sha2('python',512));
INSERT INTO server_login_info (name, user,password) VALUES ('servicenow', AES_ENCRYPT('ozawa', @key_str),AES_ENCRYPT('password1', @key_str));
INSERT INTO server_login_info (name, user,password) VALUES ('servicehub', AES_ENCRYPT('luffy', @key_str),AES_ENCRYPT('goingmerry', @key_str));
INSERT INTO server_login_info (name, user,password) VALUES ('madokamagica', AES_ENCRYPT('homura', @key_str),AES_ENCRYPT('entropy', @key_str));
Confirm that it was registered in the line.
SELECT name, aes_decrypt(user, @key_str) as user, aes_decrypt(password,@key_str) as password FROM server_login_info;
Execution result:
mysql> SELECT name, aes_decrypt(user, @key_str) as user, aes_decrypt(password,@key_str) as password FROM server_login_info;
+--------------+--------+------------+
| name | user | password |
+--------------+--------+------------+
| madokamagica | homura | entropy |
| servicenow | ozawa | password1 |
| servicehub | luffy | goingmerry |
+--------------+--------+------------+
3 rows in set (0.00 sec)
First, define the exit. The point is to define the decoded feel separately. models.py
from sqlalchemy import func, cast
from sqlalchemy.dialects.mysql import CHAR
from sqlalchemy.orm import column_property
class ServerLoginInfo(db.Model):
__tablename__ = 'server_login_info'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(20), unique=True)
user = db.Column(db.VARBINARY(25)) #Encrypted username
password = db.Column(db.VARBINARY(30)) #Encrypted password
user_name = column_property(
cast(
func.aes_decrypt(
user, func.unhex(func.sha2('python', 512))
),
CHAR(charset='utf8mb4'),
)
)
user_password = column_property(
cast(
func.aes_decrypt(
password, func.unhex(func.sha2('python', 512))
),
CHAR(charset='utf8mb4'),
)
)
def __init__(self, name, user, password):
self.name = name
self.user = user
self.password = password
def __repr__(self):
return f"pc_info('{self.id}', '{self.name}', '{self.user_name}', " \
f" '{self.user_password}')"
database.py
from sample.models import ServerLoginInfo
def get_login_info(server_name):
login_info_dict = {}
server_login_info = db.session.query(ServerLoginInfo).filter_by(name=server_name).first()
if server_login_info is None:
print(f'error. The specified server cannot be found. Server name:{server_name}')
return
if server_login_info.user is not None:
login_info_dict['user'] = server_login_info.user_name
login_info_dict['password'] = server_login_info.user_password
return login_info_dict
if __name__ == '__main__':
login_info = get_login_info('madokamagica')
print(login_info)
Execution result:
C:/Users/ozawa/sample/database_api.py
{'user': 'homura', 'password': 'entropy'}
Process finished with exit code 0
Name: server_login_info
No. | Column name | Mold | PK | NN | UQ | AI | comment |
---|---|---|---|---|---|---|---|
1. | id | INT(11) | ☑ | ☑ | ☑ | Primary key | |
2. | name | VARCHAR(20) | ☑ | ☑ | |||
3. | user | VARCHAR(25) | Login username (encrypted) | ||||
4. | password | VARCHAR(30) | Login password (encryption) |
Add sample data to the table
SET @key_str =unhex(sha2('python',512));
INSERT INTO server_login_info (name, user,password) VALUES ('servicenow', hex(AES_ENCRYPT('ozawa', @key_str)), hex(AES_ENCRYPT('password1', @key_str)));
INSERT INTO server_login_info (name, user,password) VALUES ('servicehub', hex(AES_ENCRYPT('luffy', @key_str)) , hex(AES_ENCRYPT('goingmerry', @key_str)));
INSERT INTO server_login_info (name, user,password) VALUES ('madokamagica', hex(AES_ENCRYPT('homura', @key_str)), hex(AES_ENCRYPT('entropy', @key_str)));
Confirm that it was registered in the line.
SELECT name, aes_decrypt(unhex(user), @key_str) as user, aes_decrypt(unhex(password),@key_str) as password FROM server_login_info;
Execution result:
mysql> SELECT name, aes_decrypt(unhex(user), @key_str) as user, aes_decrypt(unhex(password), @key_str) as password FROM server_login_info;
+--------------+--------+------------+
| name | user | password |
+--------------+--------+------------+
| madokamagica | homura | entropy |
| servicenow | ozawa | password1 |
| servicehub | luffy | goingmerry |
+--------------+--------+------------+
3 rows in set (0.00 sec)
First, define the exit. The point is to define the decoded feel separately. models.py
from sqlalchemy import func, cast
from sqlalchemy.dialects.mysql import CHAR
from sqlalchemy.orm import column_property
class ServerLoginInfo(db.Model):
__tablename__ = 'server_login_info'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(20), unique=True)
user = db.Column(db.String(25)) #Encrypted username
password = db.Column(db.String(30)) #Encrypted password
user_name = column_property(
cast(
func.aes_decrypt(
func.unhex(user), func.unhex(func.sha2('python', 512))
),
CHAR(charset='utf8mb4'),
)
)
user_password = column_property(
cast(
func.aes_decrypt(
func.unhex(password), func.unhex(func.sha2('python', 512))
),
CHAR(charset='utf8mb4'),
)
)
def __init__(self, name, user, password):
self.name = name
self.user = user
self.password = password
def __repr__(self):
return f"pc_info('{self.id}', '{self.name}', '{self.user_name}', " \
f" '{self.user_password}')"
database.py
from sample.models import ServerLoginInfo
def get_login_info(server_name):
login_info_dict = {}
server_login_info = db.session.query(ServerLoginInfo).filter_by(name=server_name).first()
if server_login_info is None:
print(f'error. The specified server cannot be found. Server name:{server_name}')
return
if server_login_info.user is not None:
login_info_dict['user'] = server_login_info.user_name
login_info_dict['password'] = server_login_info.user_password
return login_info_dict
if __name__ == '__main__':
login_info = get_login_info('madokamagica')
print(login_info)
that's all
Recommended Posts