** Note: This content is a migration of the old site. So please understand that the content may be strange. ** ** ** Note 2 This content is a copy from kumitatepazuru's blog to let more people know about your tool. is. Use it! ** **
This time, I will explain the library libscips that I am making.
Our zyo_sen team used agent2d (gliders2d) so far. However, agent2d is C ++ and our specialty (?) Is python, so it was difficult to decipher. And since it was originally an AI classroom team, I wanted to fight with AI. But there isn't much information in C ++ ... what should I do?
At that time, I came up with this library.
The base used by agent2d is a library called librcsc that communicates with the server. I thought that I should make it myself. Fortunately, I used to use a method that was not based on agent2d in the past, so I created it with reference to that. And what I was able to do
lib soccer communicate in python system In ** libscips ** was.
This library is
90% or more of the total is a program such as soccer calculation
I plan to do it with the goal of saying that.
First, let's have a look at player.py.
import json
from socket import socket, AF_INET, SOCK_DGRAM
class analysis:
def __init__(self, error, analysis_log):
self.error = error
self.analysis_log = analysis_log
def msg_analysis(self, text, log_show=None):
text = text[0]
if text[0] == "error":
text = text[1].replace("_", " ")
log = "\033[38;5;1m[ERR]" + (
"\033[38;5;13mno \033[4m" + self.no + "\033[0m ") * (
self.no != "") + "\t\033[4m" + text + "\033[0m"
r = {"type": "error", "value": str(self.error.get(text) + (self.error.get(text) is None))}
elif text[0] == "init":
self.no = text[2]
log = "\033[38;5;10m[OK]" + (
"\033[38;5;13mno \033[4m" + self.no + "\033[0m ") * (
self.no != "") + "\t\033[38;5;10minit msg.\t\033[4m" + "\033[38;5;11mleft team" * (
text[1] == "l") + \
"\033[38;5;1mright team" * (text[1] == "r") + "\033[0m\033[38;5;6m no \033[4m" + text[2] + "\033[0m"
r = {"type": "init", "value": text[:-2]}
elif text[0] == "server_param" or text[0] == "player_param" or text[0] == "player_type":
log = "\033[38;5;12m[INFO]" + (
"\033[38;5;13mno \033[4m" + self.no + "\033[0m ") * (
self.no != "") + "\t\033[38;5;10m" + text[0] + " msg.\033[0m"
r = {"type": text[0], "value": text[1:]}
elif text[0] == "see" or text[0] == "sense_body":
log = "\033[38;5;12m[INFO]" + (
"\033[38;5;13mno \033[4m" + self.no + "\033[0m ") * (
self.no != "") + "\t\033[38;5;10m" + text[0] + " msg. \033[38;5;9mtime \033[4m" + text[
1] + "\033[0m"
r = {"type": text[0], "time": int(text[1]), "value": text[2:]}
elif text[0] == "hear":
log = "\033[38;5;12m[INFO]" + (
"\033[38;5;13mno \033[4m" + self.no + "\033[0m ") * (
self.no != "") + "\t\033[38;5;10mhear msg. \033[38;5;9mtime \033[4m" + text[1] + "\033[0m " + \
"\033[38;5;6mspeaker \033[4m" + text[2] + "\033[0m " + "\033[38;5;13mcontents \033[4m" + text[3] + \
"\033[0m"
r = {"type": "hear", "time": int(text[1]), "speaker": text[2], "contents": text[3]}
elif text[0] == "change_player_type":
log = "\033[38;5;12m[INFO]" + (
"\033[38;5;13mno \033[4m" + self.no + "\033[0m ") * (
self.no != "") + "\t\033[38;5;10mhear msg. \033[0m"
r = {"type": "change_player_type", "value": text[1]}
else:
log = "\033[38;5;12m[INFO]" + (
"\033[38;5;13mno \033[4m" + self.no + "\033[0m ") * (
self.no != "") + "\t\033[38;5;10mUnknown return value \033[0m\033[4m" + str(text) + "\033[0m"
r = {"type": "unknown", "value": text}
if log_show is None:
log_show = r["type"] in self.analysis_log
if log_show:
print(log)
return r
def see_analysis(self, text, hit, log_show=None):
if type(hit) == str:
hit = [hit]
text = text[0]
for i in text[2:]:
if i[0] == hit:
if log_show is None:
log_show = hit in self.analysis_log or hit[0] in self.analysis_log
if log_show:
print("\033[38;5;12m[INFO]\t\033[38;5;10mThere was a " + str(
hit) + " in the visual information.\033[0m")
return i[1:]
if log_show:
print("\033[38;5;12m[INFO]\t\033[38;5;10mThere was no " + str(hit) + " in the visual information.\033[0m")
return None
class player_signal(analysis):
def __init__(self, ADDRESS="127.0.0.1", HOST="", send_log=False, recieve_log=False, analysis_log=("unknown",
"init", "error")):
self.ADDRESS = ADDRESS
self.s = socket(AF_INET, SOCK_DGRAM)
ok = 0
i = 0
print("\033[38;5;12m[INFO]\t\033[38;5;13mSearching for available ports ...\033[0m")
while ok == 0:
try:
self.s.bind((HOST, 1000 + i))
ok = 1
except OSError:
i += 1
self.recieve_port = 1000 + i
self.recieve_log = recieve_log
self.send_log = send_log
self.analysis_log = analysis_log
self.no = ""
self.player_port = 0
self.error = {"no more player or goalie or illegal client version": 0}
super().__init__(self.error, self.analysis_log)
def __del__(self):
self.s.close()
def send_msg(self, text, PORT=6000, log=None):
self.s.sendto((text + "\0").encode(), (self.ADDRESS, PORT))
self.send_logging(text, PORT, log=log)
def send_logging(self, text, PORT, log=None):
if log is None:
log = self.send_log
if log:
print("\033[38;5;12m[INFO]\t" + (
"\033[38;5;13mno \033[4m" + self.no + "\033[0m ") * (self.no != "") + "\033[38;5;10mSend msg.\t" +
"\033[38;5;9mPORT \033[4m" + str(self.recieve_port) + "\033[0m\033[38;5;9m → \033[4m" + str(PORT) +
"\033[0m\t\033[38;5;6mTEXT \033[4m" + text + "\033[0m")
def send_init(self, name, goalie=False, version=15, log=None):
msg = "(init " + name + " (goalie)" * goalie + " (version " + str(version) + "))"
self.send_msg(msg, log=log)
r = self.recieve_msg(log=log)
self.player_port = r[1][1]
return r
def send_move(self, x, y, log=None):
msg = "(move " + str(x) + " " + str(y) + ")"
self.send_msg(msg, self.player_port, log=log)
def send_dash(self, power, log=None):
msg = "(dash " + str(power) + ")"
self.send_msg(msg, self.player_port, log=log)
def send_turn(self, moment, log=None):
msg = "(turn " + str(moment) + ")"
self.send_msg(msg, self.player_port, log=log)
def send_turn_neck(self, angle, log=None):
msg = "(turn_neck " + str(angle) + ")"
self.send_msg(msg, self.player_port, log=log)
def send_kick(self, power, direction, log=None):
msg = "(kick " + str(power) + " " + str(direction) + ")"
self.send_msg(msg, self.player_port, log=log)
def recieve_msg(self, log=None):
msg, address = self.s.recvfrom(8192)
if log is None:
log = self.recieve_log
if log:
print("\033[38;5;12m[INFO]" + (
"\033[38;5;13mno \033[4m" + self.no + "\033[0m ") * (
self.no != "") + "\t\033[0m\033[38;5;10mGet msg.\t\033[38;5;9mPORT \033[4m" + str(
self.recieve_port) + "\033[0m\033[38;5;9m ← \033[4m" +
str(address[1]) + "\033[0m\t\033[38;5;6mIP \033[4m" + address[0] + "\033[0m")
return json.loads(msg[:-1].decode("utf-8").replace(" ", " ").replace("(", '["').replace(")", '"]').
replace(" ", '","').replace('"[', "[").replace(']"', "]").replace("][", "],[").
replace('""', '"')), address
It's a short program with 156 lines. I will explain this one by one.
This program does not currently use external libraries.
Please refer to libscips WIKI for how to use each class.
First, I will explain from analysis.
Normally, it is used by inheriting to player_signal described later. The classes are separated just for the sake of clarity.
So the very first \ _ \ _ init \ __ function
def __init__(self, error, analysis_log):
self.error = error
self.analysis_log = analysis_log
Is for completely avoiding NameError & Pycharm warnings. You don't have to.
The following msg_analysis is a program that decomposes the argument text, classifies the types of commands, performs conditional branching, and returns it as an easy-to-use dictionary type.
The content of the argument text is
(["see", "0", [["b"], "10", "0"], ...], (127.0.0.1, 6000))
Like this.
def msg_analysis(self, text, log_show=None):
text = text[0]
if text[0] == "error":
text = text[1].replace("_", " ")
log = "\033[38;5;1m[ERR]" + (
"\033[38;5;13mno \033[4m" + self.no + "\033[0m ") * (
self.no != "") + "\t\033[4m" + text + "\033[0m"
r = {"type": "error", "value": str(self.error.get(text) + (self.error.get(text) is None))}
elif text[0] == "init":
self.no = text[2]
log = "\033[38;5;10m[OK]" + (
"\033[38;5;13mno \033[4m" + self.no + "\033[0m ") * (
self.no != "") + "\t\033[38;5;10minit msg.\t\033[4m" + "\033[38;5;11mleft team" * (
text[1] == "l") + \
"\033[38;5;1mright team" * (text[1] == "r") + "\033[0m\033[38;5;6m no \033[4m" + text[2] + "\033[0m"
r = {"type": "init", "value": text[:-2]}
# ---Conditional branch continues---
if log_show is None:
log_show = r["type"] in self.analysis_log
if log_show:
print(log)
return r
It is a function in which conditional branching of commands continues forever.
The variable text contains the data to be parsed. The reason why text = text [0]
is included at the very beginning is that as you can see from the example of the argument text written above, the original data also contains the sender information, which is an obstacle. I'm erasing it.
The variable log is displayed when the command applies to log_show or self.analysis_log.
The variable r contains the dictionary type to be returned.
Next is see_analysis. It is a function that returns if there is a specific object in the see information and makes it easy to handle.
def see_analysis(self, text, hit, log_show=None):
if type(hit) == str:
hit = [hit]
text = text[0]
for i in text[2:]:
if i[0] == hit:
if log_show is None:
log_show = hit in self.analysis_log or hit[0] in self.analysis_log
if log_show:
print("\033[38;5;12m[INFO]\t\033[38;5;10mThere was a " + str(
hit) + " in the visual information.\033[0m")
return i[1:]
if log_show:
print("\033[38;5;12m[INFO]\t\033[38;5;10mThere was no " + str(hit) + " in the visual information.\033[0m")
return None
the first
if type(hit) == str:
hit = [hit]
Since the object data is listed, an error will occur if you process it in string format. So in the case of string format, it is converted to a list.
After that, if the object data is searched, the object information is returned. And if log_show is True, it's a really simple function to log.
Next is the player_signal class.
The very first \ _ \ _ init \ _ \ _ function
def __init__(self, ADDRESS="127.0.0.1", HOST="", send_log=False, recieve_log=False, analysis_log=("unknown","init", "error")):
self.ADDRESS = ADDRESS
self.s = socket(AF_INET, SOCK_DGRAM)
ok = 0
i = 0
print("\033[38;5;12m[INFO]\t\033[38;5;13mSearching for available ports ...\033[0m")
while ok == 0:
try:
self.s.bind((HOST, 1000 + i))
ok = 1
except OSError:
i += 1
self.recieve_port = 1000 + i
self.recieve_log = recieve_log
self.send_log = send_log
self.analysis_log = analysis_log
self.no = ""
self.player_port = 0
self.error = {"no more player or goalie or illegal client version": 0}
super().__init__(self.error, self.analysis_log)
Is a function that makes initial settings and secures a port. Put the argument into self
while ok == 0:
try:
self.s.bind((HOST, 1000 + i))
ok = 1
except OSError:
i += 1
Here, turn until OSError disappears to search for and secure a free port.
And
super().__init__(self.error, self.analysis_log)
At this point, execute the error avoidance init mentioned earlier.
The \ _ \ _ init \ _ \ _ function looks like this.
Next is the \ _ \ _ del \ _ \ _ function.
def __del__(self):
self.s.close()
A function that opens a port when the program ends. It will be released automatically, but just in case.
Next is the send_msg function.
def send_msg(self, text, PORT=6000, log=None):
self.s.sendto((text + "\0").encode(), (self.ADDRESS, PORT))
self.send_logging(text, PORT, log=log)
This function sends a message, but what it does is send a message on the second line, and display it when displaying the log on the third line (call send_logging described later). That's it.
Next is send_logging, which was mentioned earlier. I'm just using the print statement to log.
def send_logging(self, text, PORT, log=None):
if log is None:
log = self.send_log
if log:
print("\033[38;5;12m[INFO]\t" + (
"\033[38;5;13mno \033[4m" + self.no + "\033[0m ") * (self.no != "") + "\033[38;5;10mSend msg.\t" +
"\033[38;5;9mPORT \033[4m" + str(self.recieve_port) + "\033[0m\033[38;5;9m → \033[4m" + str(PORT) +
"\033[0m\t\033[38;5;6mTEXT \033[4m" + text + "\033[0m")
If log is None, it refers to the send_log specified by the \ _ \ _ init \ _ \ _ function, and if it is True, it is displayed, and if it is True, it is displayed without any questions.
Next is send_init. As the name implies, a function that just sends an init command.
def send_init(self, name, goalie=False, version=15, log=None):
msg = "(init " + name + " (goalie)" * goalie + " (version " + str(version) + "))"
self.send_msg(msg, log=log)
r = self.recieve_msg(log=log)
self.player_port = r[1][1]
return r
Roughly speaking
Create msg to send in second line
Actually send on the 3rd line (call the send_msg function)
Check the server response on the 4th line (call the recieve_msg function described later)
Check the server port for move commands, etc. on the 5th line (server port 6000 certainly only accepts init commands)
The 6th line returns a response as a return value.
Feeling like.
Next is send_move, send_dash, send_turn, send_turn_neck, send_kick. A function that moves the player.
The contents are (send_move)
def send_move(self, x, y, log=None):
msg = "(move " + str(x) + " " + str(y) + ")"
self.send_msg(msg, self.player_port, log=log)
It just creates and sends msg (calls send_msg).
Finally recieve_msg. As the name implies, a function that receives information sent from a server. It is also my best masterpiece function.
def recieve_msg(self, log=None):
msg, address = self.s.recvfrom(8192)
if log is None:
log = self.recieve_log
if log:
print("\033[38;5;12m[INFO]" + (
"\033[38;5;13mno \033[4m" + self.no + "\033[0m ") * (
self.no != "") + "\t\033[0m\033[38;5;10mGet msg.\t\033[38;5;9mPORT \033[4m" + str(
self.recieve_port) + "\033[0m\033[38;5;9m ← \033[4m" +
str(address[1]) + "\033[0m\t\033[38;5;6mIP \033[4m" + address[0] + "\033[0m")
return json.loads(msg[:-1].decode("utf-8").replace(" ", " ").replace("(", '["').replace(")", '"]').
replace(" ", '","').replace('"[', "[").replace(']"', "]").replace("][", "],[").
replace('""', '"')), address
To explain
Receive a message from the server on the second line
Show log if necessary on lines 3-8
Correct the message to information that is easy to handle as a return value on the 9th to 11th lines.
What's great is that it only takes one line to make the information easy to handle. (Pycharm's automatic formatting only has 3 lines.)
The reason for using json.loads is that json can handle lists as well as dictionary types as shown below.
So, I use json.loads to convert from a list of strings to a list.
However, since the received information cannot be converted to the list type, it is converted to the list type after being converted to the list type.
[
{
"hello":"jobs"
},
[
"contents"
]
]
This is the only function at this stage. If functions are added in the update, I will increase it in ② and ③ at any time.
I'd like to make a cszp version soon ... I don't think it will end in my lifetime.
I thought I was writing it, but the font of the article is easy to read and it is heartwarming and perfect for blogging! It's a great match. I'm a little happy.
See you someday.
For personal questions, please contact here.
https://forms.gle/V6NRhoTooFw15hJdA
Also, the Robocup soccer simulation league team in which I participate is looking for participants! We look forward to hearing from you if you would like to observe the activity or participate in the activity!
Recommended Posts