screwdriver/PLC/IOcardQthread.py

358 lines
12 KiB
Python
Raw Permalink Normal View History

2025-02-06 16:10:58 +08:00
from PyQt5 import QtCore
from pyModbusTCP.client import ModbusClient
import time
import struct
from itertools import chain
def float32ToIEEE754Int(num):
convertNum = struct.unpack('<I',struct.pack('<f',num)) # 将浮点数按照4byte float转换
hex_convertNum = hex(convertNum[0])
hex_convertNum_Low = hex_convertNum[2:6]
hex_convertNum_High = hex_convertNum[6:]
dec_num_Low = 0
dec_num_High = 0
if(hex_convertNum_Low):
dec_num_Low = int(hex_convertNum_Low, 16)
if (hex_convertNum_High):
dec_num_High = int(hex_convertNum_High, 16)
return dec_num_High, dec_num_Low
class IOcard(QtCore.QThread):
rawdataState = QtCore.pyqtSignal(bool)
rawdataINPUT = QtCore.pyqtSignal(bool,bool,bool,bool,bool,bool,bool,bool,bool,bool)
rawdataMOTOR = QtCore.pyqtSignal(bool,bool,bool,bool,bool)
def __init__(self, SERVER_HOST,SERVER_PORT, dev_logger=None): # ,parent=None):
QtCore.QThread.__init__(self)
print('IOcard0509')
self.SERVER_HOST = SERVER_HOST
self.SERVER_PORT = SERVER_PORT
self.connect = False
self.running = False
self._outputFlag = False
self._outputBit = "0000"
self._client = None
self._goHomeFlag = False
self._dis = -310
# self._dis = 0
self._speed = 0
self._acceleration_up = 32767
self._acceleration_down = 32767
self._updataFlag = False
self.motor_idle_time = 0 # 設定馬達閒置平台移進去時間
self.dev_logger = dev_logger
self.MOTORSTATE = []
def IOCard_connect(self):
try:
self._client = ModbusClient(host=self.SERVER_HOST, port=self.SERVER_PORT, timeout=2)
if self._client.open():
self.connect = True
self.running = False
self.updataDisSpeed(310, 400)#出來
# print('PLC OK')
else:
self.connect = False
self.running = False
except Exception as e:
print(f'{e}\nPLC disconnect')
self.dev_logger.error('PLC連線失敗')
self.connect = False
self.running = False
def run(self):
while self.running and self.connect:
if(not self._outputFlag and not self._goHomeFlag and not self._updataFlag):
inputState,disAndSpeed,motorState = self._readInput()
if(inputState != None):
self.rawdataINPUT.emit(inputState[0],inputState[1],inputState[2],inputState[3],inputState[4],inputState[5],inputState[6],inputState[7],inputState[8],inputState[9])
self.rawdataMOTOR.emit(motorState[0],motorState[1],motorState[2],motorState[3],motorState[4])
self.MOTORSTATE = motorState
else:
print("PLC error")
self.dev_logger.error('PLC run error')
elif(self._updataFlag):
self._updataDisSpeed()
self._updataFlag = False
self.motor_idle_time = 0 # 設定馬達閒置平台移進去時間
elif(self._outputFlag):
self._switch(self._outputBit)
self._outputFlag =False
elif(self._goHomeFlag):
self._goHome()
self._goHomeFlag = False
self.motor_idle_time = 0 # 設定馬達閒置平台移進去時間
time.sleep(1 / 100)
# region IO
def _updataDisSpeed(self):
if self.connect:
acceleration_up = [float32ToIEEE754Int(self._acceleration_up)]
acceleration_up = list(chain.from_iterable(acceleration_up))
is_ok = self._client.write_multiple_registers(0x1F4, acceleration_up)
acceleration_down = [float32ToIEEE754Int(self._acceleration_down)]
acceleration_down = list(chain.from_iterable(acceleration_down))
is_ok = self._client.write_multiple_registers(0x1F6, acceleration_down)
disAndSpeed = [float32ToIEEE754Int(self._dis),float32ToIEEE754Int(self._speed)]
disAndSpeed = list(chain.from_iterable(disAndSpeed))
is_ok = self._client.write_multiple_registers(0x00, disAndSpeed)
# is_ok = self._client.write_multiple_registers(8195, [0X02])
# # is_ok = self._client.write_multiple_registers(8203, 1)
# # is_ok = self._client.write_multiple_registers(8202, 1)
# # is_ok = self._client.write_multiple_registers(8203, 1)
# print(is_ok)
# is_ok = self._client.write_multiple_registers(0x02, float32ToIEEE754Int(-1.2))
#更新
is_ok = self._client.write_multiple_coils(10, [1])
#動作
is_ok = self._client.write_multiple_coils(0, [1])
return is_ok
def _goHome(self):
print(self.connect)
if self.connect:
is_ok = self._client.write_multiple_coils(5, [1])
return is_ok
def _send0000(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [0, 0, 0, 0, 0, 0, 0, 0])
return is_ok
def _send0001(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [0, 0, 0, 1])
return is_ok
def _send0010(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [0, 0, 1, 0])
return is_ok
def _send0011(self):#detectEnd
if self.connect:
is_ok = self._client.write_multiple_coils(20, [0, 0, 1, 1, 0, 0, 0, 0, 0])
return is_ok
def _send0100(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [0, 1, 0, 0])
return is_ok
def _send0101(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [0, 1, 0, 1])
return is_ok
def _send0110(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [0, 1, 1, 0])
return is_ok
def _send0111(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [0, 1, 1, 1])
return is_ok
def _send1000(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [1, 0, 0, 0])
return is_ok
def _send1001(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [1, 0, 0, 1])
return is_ok
def _send1010(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [1, 0, 1, 0])
return is_ok
def _send1011(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [1, 0, 1, 1])
return is_ok
def _send1100(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [1, 1, 0, 0])
return is_ok
def _send1101(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [1, 1, 0, 1])
return is_ok
def _send1110(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [1, 1, 1, 0])
return is_ok
def _send1111(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [1, 1, 1, 1, 1, 1, 1, 1])
return is_ok
def _send00001111(self):#open all light
if self.connect:
is_ok = self._client.write_multiple_coils(20, [0, 0, 0, 0, 1, 1, 1, 1])
return is_ok
def _send00001011(self):#open AI light
if self.connect:
is_ok = self._client.write_multiple_coils(20, [0, 0, 0, 0, 1, 0, 1, 1])
return is_ok
def _send00001100(self):#open AOI light
if self.connect:
is_ok = self._client.write_multiple_coils(20, [0, 0, 0, 0, 1, 1, 0, 0])
return is_ok
def _send10001011(self):#detectStart_1 : open AI light Green Light
if self.connect:
is_ok = self._client.write_multiple_coils(20, [1, 0, 0, 0, 1, 0, 1, 1])
return is_ok
def _send10001100(self):#detectStart_1 : open AOI light Green Light
if self.connect:
is_ok = self._client.write_multiple_coils(20, [1, 0, 0, 0, 1, 1, 0, 0])
return is_ok
def _send01000000(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [0, 1, 0, 0, 0, 0, 0, 0])
return is_ok
def _send00100000(self):
if self.connect:
is_ok = self._client.write_multiple_coils(20, [0, 0, 1, 0, 0, 0, 0, 0])
return is_ok
def _readInput(self):
response = self._client.read_coils(20, 10)
disAndSpeed = self._client.read_holding_registers(300,2)
if disAndSpeed[0] > 32767:
disAndSpeed[0] = disAndSpeed[0] - 65536
motorState = self._client.read_coils(0,8)
motorState = [motorState[1],motorState[2],motorState[3],motorState[6],motorState[7]]
return response,disAndSpeed,motorState
# endregion
def _switch(self,lang):
if lang == "0000":
self._send0000()
elif lang == "0001":
self._send0001()
elif lang == "0010":
self._send0010()
elif lang == "0011":
self._send0011()
elif lang == "0100":
self._send0100()
elif lang == "0101":
self._send0101()
elif lang == "0110":
self._send0110()
elif lang == "0111":
self._send0111()
elif lang == "1000":
self._send1000()
elif lang == "1001":
self._send1001()
elif lang == "1010":
self._send1010()
elif lang == "1011":
self._send1011()
elif lang == "1100":
self._send1100()
elif lang == "1101":
self._send1101()
elif lang == "1110":
self._send1110()
elif lang == "1111":
self._send1111()
elif lang == "00001111":
self._send00001111()
elif lang == "00001011":
self._send00001011()
elif lang == "00001100":
self._send00001100()
elif lang == "10001011":
self._send10001011()
elif lang == "10001100":
self._send10001100()
elif lang == "01000000":
self._send01000000()
elif lang == "00100000":
self._send00100000()
def openAll(self):
self._outputFlag = True
self._outputBit = "1111"
def closeAll(self):
self._outputFlag = True
self._outputBit = "0000"
def openBuzzer(self):
self._outputFlag = True
self._outputBit = "0001"
def openRedLight(self):
self._outputFlag = True
self._outputBit = "0010"
def openRedLight_closeDetctLight(self):
self._outputFlag = True
self._outputBit = "00100000"
def openYellowLight(self):
self._outputFlag = True
self._outputBit = "0100"
def openYellowLight_closeDetctLight(self):
self._outputFlag = True
self._outputBit = "01000000"
def openGreenLight(self):
self._outputFlag = True
self._outputBit = "1000"
def openAllLight(self):
self._outputFlag = True
self._outputBit = "00001111"
def openAILight(self):
self._outputFlag = True
self._outputBit = "00001011"
def openAOILight(self):
self._outputFlag = True
self._outputBit = "00001100"
def detectStart_1(self):
self._outputFlag = True
self._outputBit = "10001011"
def detectStart_2(self):
self._outputFlag = True
self._outputBit = "10001100"
def detectEndNG(self):
self._outputFlag = True
self._outputBit = "0011"
def goHome(self):
self._goHomeFlag = True
def updataDisSpeed(self, Dis,Speed):
# if self._dis + Dis == 0:
# self._dis = Dis
# self._speed = Speed
# self._updataFlag = True
self._dis = Dis
self._speed = Speed
self._updataFlag = True
def open(self):
if self.connect:
self.running = True
def stop(self):
if self.connect:
self.running = False
def close(self):
if self.connect:
self.running = False
time.sleep(1)
self._client.close()