1 Pluspunkt 0 Minuspunkte

Ich habe einen Server der Nachrichten in einem MessageQueue speichert und daraus liest. Die Länge des Queue ist aber immer 1 und danach wieder 0. Es scheint als ob der Queue nicht wachsen könnte weil ich sehr viele Nachrichten auf einmal sende und trotzdem dieses Verhalten auftritt.

import socket
import threading
import queue

message_queue = queue.Queue()

def handle_client(client_socket, addr):
    while True:
        data = client_socket.recv(100)
        if not data:
            break
        message = data.decode()
        print(f"Empfangene Nachricht von {addr}: {message}")
        message_queue.put((client_socket, message))  
        print("len: ", str(message_queue.qsize()))

    print("Client hat die Verbindung geschlossen")
    client_socket.close()

def process_messages():
    while True:
        client_socket, message = message_queue.get()  
        client_socket.send(message.encode())
        message_queue.task_done()
        print("len: ", str(message_queue.qsize()))

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(('127.0.0.1', 8888))
    server.listen(5)

    message_processor = threading.Thread(target=process_messages)
    message_processor.start()

    while True:
        client_socket, addr = server.accept()
        client_thread = threading.Thread(target=handle_client, args=(client_socket, addr))
        client_thread.start()

if __name__ == "__main__":
    main()
von  

1 Antwort

0 Pluspunkte 0 Minuspunkte

Es scheint du möchtest einen asynchronen Server implementieren. Asynchroner Code ist Code, der parallel abläuft und sozusagen hängen bleiben kann, während auf ein Ergebnis gewartet wird, damit in der Zwischenzeit anderer Code ausgeführt werden kann.   

import asyncio
import queue

message_queue = queue.Queue()

async def handle_client(reader, writer, message_queue):
    while True:
        data = await reader.read(100)
        if not data:
            break        
        message = data.decode()
        addr = writer.get_extra_info('peername')
        print(f"Empfangene Nachricht von {addr}: {message}")        
        message_queue.put(message)          
        print("len: ", str(message_queue.qsize()))        
        writer.write(data)
        await writer.drain()    
    print("Client hat die Verbindung geschlossen")
    writer.close()

 
async def poll_server_for_messages(message_queue):
    while True:
        if not message_queue.empty():
            new_message = message_queue.get()
            print("Neue Nachricht empfangen:", new_message)
            message_queue.task_done()
            print("len: ", str(message_queue.qsize()))           
        await asyncio.sleep(0.00001)


async def main():
    server = await asyncio.start_server(lambda r, w: handle_client(r, w, message_queue), '127.0.0.1', 8888)   
    async with server:
        await server.serve_forever()
        
        
async def run_server_and_poll():
    server_task = asyncio.create_task(main())
    polling_task = asyncio.create_task(poll_server_for_messages(message_queue))   
    try:
        await asyncio.gather(server_task, polling_task)
    except KeyboardInterrupt:
        print("Server wird heruntergefahren...")
        server_task.cancel()
        polling_task.cancel()
        
        
asyncio.run(run_server_and_poll())

handle_client() wird für jeden Verbindungsaufbau aufgerufen und fügt die Nachricht zur Warteschlange message_queue hinzu.

poll_server_for_messages() wird in einer separaten Schleife ausgeführt und überwacht die Warteschlange. Wenn eine neue Nachricht verfügbar ist, wird sie entfernt und verarbeitet.

run_server_and_poll() startet zwei Aufgaben gleichzeitig: die Hauptserveraufgabe (main()) und die Nachrichtenüberwachungsaufgabe (poll_server_for_messages()).

von (868 Punkte)