from pyModbusTCP.client import ModbusClient
from pyModbusTCP import utils
from datetime import datetime
import time
# 100 Bits from PLC to PanelPC: 000601 - 000700
# 100 Bits from PanelPC to PLC: 000401 - 000500
# 100 16 bit Words from PLC to PanelPC: 400200 - 400299
# 100 16 bit Words from PanelPC to PLC: 400001 - 400100
# 100 32 bit floats from PLC to PanelPC: 401000 - 401199 (by twos)
# 100 32 bit floats from PanelPC to PLC: 400600 - 400799 (by twos)
class p3kModbus():
def __init__(self, ip="192.168.0.10", port=502, timeout = 5):
self.c = ModbusClient()
self.c.host(ip)
self.c.port(port)
self.c.timeout(timeout)
#self.c.debug(True)
#self.c.auto_open(True)
#self.c.auto_close(True)
self.bitsIn = []
self.bitsOut = []
self.bitsInStartingAddr = 601
self.bitsInLength = 100
self.bitsOutStartingAddr = 401
self.bitsOutLength = 100
self.wordsIn = []
self.wordsOut = []
self.wordsInStartingAddr = 201 # (400201)
self.wordsInLength = 100
self.wordsOutStartingAddr = 1 # (400001)
self.wordsOutLength = 100
self.floatsIn = []
self.floatsOut = []
self.floatsInStartingAddr = 1000 # (401000)
self.floatsInLength = 124 # anything bigger than 124 results in error
self.floatsOutStartingAddr = 600 # (400600)
self.floatsOutLength = 100
# managing TCP sessions with call to c.open()/c.close()
def start(self):
self.success = self.c.open()
if self.success == True:
return True
else:
return False
#raise Exception('Failed to connect to Click on Modbus TCP')
def readBits(self):
err = 0
self.bitsIn = []
#self.bitsOut = []
if self.c.is_open():
self.bitsIn = self.c.read_coils(self.bitsInStartingAddr-1, self.bitsInLength)
if self.bitsIn:
if len(self.bitsIn) < self.bitsInLength:
err = 1
raise Exception ("Failed to read all bits from P3000 on Modbus TCP")
else:
self.bitsIn.insert(0, "X") # offset the values by one since Click I/O starts at 1, not 0
else:
err = 1
raise Exception("No reply from P3000 on Modbus TCP")
else:
err = 1
raise Exception("No reply from P3000 on Modbus TCP")
if err == 0 :
return True
else:
return False
def readWords(self):
err = 0
#self.wordsIn = []
# self.bitsOut = []
if self.c.is_open():
self.wordsIn = self.c.read_holding_registers(self.wordsInStartingAddr-1, self.wordsInLength)
if self.wordsIn:
#print(self.wordsIn)
#print(len(self.wordsIn))
if len(self.wordsIn) < self.wordsInLength:
err = 1
raise Exception("Failed to read all bits from P3000 on Modbus TCP")
else:
self.wordsIn.insert(0, "X") # offset the values by one since Click I/O starts at 1, not 0
else:
err = 1
raise Exception("No reply from P3000 on Modbus TCP")
else:
err = 1
raise Exception("No reply from P3000 on Modbus TCP")
if err == 0:
return True
else:
return False
def readFloats(self):
err = 0
if self.c.is_open():
self.floatsIn = self.c.read_holding_registers(self.floatsInStartingAddr -1, self.floatsInLength)
if self.floatsIn:
if len(self.floatsIn) < self.floatsInLength:
err = 1
raise Exception("Failed to read all floats from P3000 on Modbus TCP")
else:
floatlist = []
for i in utils.word_list_to_long(self.floatsIn,False):
floatlist.append(utils.decode_ieee(i))
self.floatsIn = []
self.floatsIn = floatlist
self.floatsIn.insert(0, "X") # offset the values by one since Click I/O starts at 1, not 0
else:
err = 1
raise Exception("1No reply from P3000 on Modbus TCP")
else:
err = 1
raise Exception("2No reply from P3000 on Modbus TCP")
if err == 0:
return True
else:
return False
def writeSingleFloat(self, which, val):
if self.c.is_open():
addr = self.floatsOutStartingAddr + (which * 2) - 3
floats_list = [val]
b32_l = [utils.encode_ieee(f) for f in floats_list]
b16_l = utils.long_list_to_word(b32_l,False)
success = self.c.write_multiple_registers(addr, b16_l)
if success != True:
raise Exception("Failed to write float to P3k on Modbus TCP")
else:
print("closed!")
def writeSingleBit(self, which, state):
if self.c.is_open():
addr = self.bitsOutStartingAddr + which - 2
success = self.c.write_single_coil(addr,state)
if success != True:
raise Exception("Failed to write single bit to Click on Modbus TCP")
else:
print("closed!")
def writeSingleWord(self, which, val):
if val <= 65535 and val >=0:
if self.c.is_open():
addr = self.wordsOutStartingAddr + which - 2
success = self.c.write_single_register(addr,val)
if success != True:
raise Exception("Failed to write single word to P3k on Modbus TCP")
else:
print("closed!")
else:
raise Exception("Invalid value (",val,"). Unsigned single integers must be between 0 and 65535")
def writeAllBits(self, outputs=[]):
if self.c.is_open():
if len(outputs) != 6:
raise Exception("Must specify exactly 6 outputs")
success = self.c.write_multiple_coils(8192,outputs)
if success != True:
raise Exception("Failed to write all bits to Click on Modbus TCP")
else:
print("closed!")
def dateAndTime(self):
now = datetime.now() # current date and time
y = int(now.strftime("%Y"))
mo = int(now.strftime("%m"))
d = int(now.strftime("%d"))
h = int(now.strftime("%H"))
mi = int(now.strftime("%M"))
s = int(now.strftime("%S"))
return (y,mo,d,h,mi,s)
def closeConnection(self):
if self.c.close() == True:
return True
else:
raise Exception("No TCP connection exists to be closed")
return False
if __name__ == "__main__":
count = 0
myConn = p3kModbus(ip="192.168.1.10")
timestamp = time.time()
def sendDT():
dt = myConn.dateAndTime()
for i in range(0,len(dt)):
myConn.writeSingleWord(i+10,dt[i])
print(dt[i])
if myConn.start() == True:
sendDT()
demo1 = False
#demo = True
if demo1 == True:
while myConn.start() == True:
time.sleep(1)
if myConn.readBits() == True:
myBit = myConn.bitsIn[99]
if myBit == False:
myConn.writeSingleBit(99, True)
else:
myConn.writeSingleBit(99, False)
if myConn.readWords() == True:
myWord = myConn.wordsIn[3]
#print(myWord)
myConn.writeSingleWord(3,myWord+1)
if myConn.readFloats() == True:
myFloat = myConn.floatsIn[7]
myConn.writeSingleFloat(7, myFloat + 1.39)
count+=1
elapsed = str(round((time.time()-timestamp)*1000))+"mS"
timestamp = time.time()
print("cycle #:", count,"cycle time:", elapsed, ", Bit 99:",myBit,", Word 3:",myWord,"Float 7:",myFloat)
demo2 = False
#demo = True
if demo2 == True:
while myConn.start() == True and count<20:
#time.sleep(1)
if myConn.readBits() == True:
myBit = myConn.bitsIn[10]
if myBit == False:
myConn.writeSingleBit(10, True)
else:
myConn.writeSingleBit(10, False)
if myConn.readWords() == True:
myWord = myConn.wordsIn[3]
myConn.writeSingleWord(3,myWord-1)
if myConn.readFloats() == True:
myFloat = myConn.floatsIn[7]
myConn.writeSingleFloat(7, myFloat - 1.39)
count+=1
elapsed = str(round((time.time()-timestamp)*1000))+"mS"
timestamp = time.time()
print("cycle #:", count,"cycle time:", elapsed, ", Bit 10:",myBit,", Word 3:",myWord,"Float 7:",myFloat)
#myConn.writeAllBits([0, 1, 0, 0, 0, 0]) #coils set by ladder ovveride the modbus command