You can find a python chat program on the command line that is not a web app. However, none of them (those that can input asynchronously) work on Windows. For the time being, I made something that works, so I will publish it.
Start as "python server.py IP address"
server.py
import sys, socket, select
def broadcast(socklist, server_socket, sock, message):
print(message)
for socket in socklist:
if socket != server_socket and socket != sock :
try :
socket.send(message.encode())
except :
socket.close()
socklist.remove(socket)
if __name__ == '__main__':
port, socklist, server = 5001, [], '127.0.0.1' if len(sys.argv) <= 1 else sys.argv[1]
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((server, port))
server_socket.listen(10)
socklist.append(server_socket)
print('Start')
while True:
read_sockets, write_sockets, error_sockets = select.select(socklist, [], [])
for sock in read_sockets:
if sock == server_socket:
sockfd, addr = server_socket.accept()
socklist.append(sockfd)
broadcast(socklist, server_socket, sockfd, '[%s:%s] Enter' % addr)
else:
try:
data = sock.recv(4096).decode()
if data == '': raise Exception('Done')
if data:
broadcast(socklist, server_socket, sock, data)
except Exception as e:
print(e)
broadcast(socklist, server_socket, sock, '[%s, %s] Exit' % addr)
sock.close()
socklist.remove(sock)
Run as "python client.py username IP address"
client.py
import sys, socket, select, threading
def prompt(user) :
sys.stdout.write('%s> ' % user)
sys.stdout.flush()
if __name__ == "__main__":
if len(sys.argv) < 3:
print('Usage : python %s user host' % sys.argv[0])
sys.exit()
(user, host), port = sys.argv[1:3], 5001
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try :
server_sock.connect((host, port))
except :
print('Unable to connect')
sys.exit()
print('Start')
def listen():
while True:
read_sockets, write_sockets, error_sockets = select.select([server_sock], [], [])
try:
data = server_sock.recv(4096).decode()
except:
break
sys.stdout.write('\r%s\n' % data)
prompt(user)
print('\rTerminated')
t = threading.Thread(target=listen)
t.start()
prompt(user)
while True:
msg = sys.stdin.readline().strip()
if not msg:
server_sock.close()
break
try:
server_sock.send(('%s| %s' % (user, msg)).encode())
except:
break
prompt(user)
In Python2 series, only ASCII is OK.
Recommended Posts