A better version using Thread.
It allows only one connection. If you want more remove the lock.
Code: Select all
import time
import RPi.GPIO as GPIO
import socket
from _thread import *
import threading
# GPIO setup
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
servo_pin = 3 # Change this to the GPIO pin connected to the servo
GPIO.setup(servo_pin, GPIO.OUT)
servo = GPIO.PWM(servo_pin, 50) # PWM frequency = 50 Hz
servo.start(0)
ExitFlag = False
socket_lock = threading.Lock()
def move_servo(angle):
print("Move to angle", angle)
duty_cycle = (angle / 18) + 2
GPIO.output(servo_pin, True)
servo.ChangeDutyCycle(duty_cycle)
time.sleep(1)
GPIO.output(servo_pin, False)
servo.ChangeDutyCycle(0)
def socketThread(thesocket):
global ExitFlag
while True:
data = thesocket.recv(1024).decode('utf-8')
if not data:
socket_lock.release();
break
print("Received command:", data.strip())
if "stop" in data.lower():
ExitFlag=True
break
elif "move" in data.lower():
try:
angle = int(data.split()[-2])
move_servo(angle)
except ValueError:
print("Invalid angle value")
thesocket.close()
if __name__ == "__main__":
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server_socket.bind(("", 12345)) # Choose a port
server_socket.settimeout(1.0)
server_socket.listen(1)
print("server ready")
try:
while not ExitFlag:
try:
asocket , addr = server_socket.accept()
except socket.timeout:
continue
if socket_lock.acquire(True,1):
print('Connect to:',addr[0],':',addr[1])
start_new_thread(socketThread,(asocket,))
else:
print("Already connected. client ",addr[0],':',addr[1]," refused.")
asocket.close()
except KeyboardInterrupt:
pass
server_socket.close()