Pymodbus 服务器写入回调
Pymodbus Server write callback
我正在使用 pymodbus 设置 ModbusServer。
我通过串行端口读取数据列表并将其写入服务器的数据存储区。
所以我可以看到数据 f.e。在 ModbusPoll
现在我想写回数据点f.e。 dp 1, value 10 function 16 on my modbus server
是否有任何回调或方法来捕获哪个地址和值已更改?
非常感谢
class CustomDataBlock(ModbusSparseDataBlock):
def setValues(self, address, value):
super(CustomDataBlock, self).setValues(address, value)
print("wrote {} to {}".format(value, address))
def setValuesInternal(self, address, value):
ModbusSparseDataBlock.setValues(CustomDataBlock, self).setValues(address, value)
def updating_writer(context):
context[0].setValuesInternal(16, 0, valueDataAct) #update Datastore with Serial values
def run_updating_server():
block = CustomDataBlock([0]*378)
store = ModbusSlaveContext(di=block, co=block, hr=block, ir=block)
context = ModbusServerContext(slaves=store, single=True)
time = 1 #in seconds
loop = LoopingCall(f=updating_writer, context=(context))
loop.start(time, now=False)
StartTcpServer(context, address=('0.0.0.0', 502))
if __name__ == "__main__":
run_updating_server()
您可以通过实施自定义数据块来做到这一点 - 请参阅 example in the documentation。然后你可以在 setValues
; 中做任何你想做的事;示例中的相关部分是:
# --------------------------------------------------------------------------- #
# import the modbus libraries we need
# --------------------------------------------------------------------------- #
from pymodbus.version import version
from pymodbus.server.asynchronous import StartTcpServer
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.datastore import ModbusSparseDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from pymodbus.transaction import ModbusRtuFramer, ModbusAsciiFramer
# --------------------------------------------------------------------------- #
# import the twisted libraries we need
# --------------------------------------------------------------------------- #
from twisted.internet.task import LoopingCall
# --------------------------------------------------------------------------- #
# configure the service logging
# --------------------------------------------------------------------------- #
import logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
class CustomDataBlock(ModbusSparseDataBlock):
def setValues(self, address, value):
super(CustomDataBlock, self).setValues(address, value)
print("wrote {} to {}".format(value, address))
def setValuesInternal(self, address, value):
ModbusSparseDataBlock.setValues(self, address, value)
def updating_writer(context):
print("value before update: " + str(context[0].getValues(4, 1, 1)))
context[0].store['d'].setValuesInternal(2, [251]) #update Datastore with Serial values
print("value after update: " + str(context[0].getValues(4, 1, 1)))
def run_updating_server():
block = CustomDataBlock([0]*378)
store = ModbusSlaveContext(di=block, co=block, hr=block, ir=block)
context = ModbusServerContext(slaves=store, single=True)
time = 1 #in seconds
loop = LoopingCall(f=updating_writer, context=(context))
loop.start(time, now=False)
StartTcpServer(context, address=('0.0.0.0', 502))
if __name__ == "__main__":
run_updating_server()
我正在使用 pymodbus 设置 ModbusServer。 我通过串行端口读取数据列表并将其写入服务器的数据存储区。 所以我可以看到数据 f.e。在 ModbusPoll
现在我想写回数据点f.e。 dp 1, value 10 function 16 on my modbus server
是否有任何回调或方法来捕获哪个地址和值已更改?
非常感谢
class CustomDataBlock(ModbusSparseDataBlock):
def setValues(self, address, value):
super(CustomDataBlock, self).setValues(address, value)
print("wrote {} to {}".format(value, address))
def setValuesInternal(self, address, value):
ModbusSparseDataBlock.setValues(CustomDataBlock, self).setValues(address, value)
def updating_writer(context):
context[0].setValuesInternal(16, 0, valueDataAct) #update Datastore with Serial values
def run_updating_server():
block = CustomDataBlock([0]*378)
store = ModbusSlaveContext(di=block, co=block, hr=block, ir=block)
context = ModbusServerContext(slaves=store, single=True)
time = 1 #in seconds
loop = LoopingCall(f=updating_writer, context=(context))
loop.start(time, now=False)
StartTcpServer(context, address=('0.0.0.0', 502))
if __name__ == "__main__":
run_updating_server()
您可以通过实施自定义数据块来做到这一点 - 请参阅 example in the documentation。然后你可以在 setValues
; 中做任何你想做的事;示例中的相关部分是:
# --------------------------------------------------------------------------- #
# import the modbus libraries we need
# --------------------------------------------------------------------------- #
from pymodbus.version import version
from pymodbus.server.asynchronous import StartTcpServer
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.datastore import ModbusSparseDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from pymodbus.transaction import ModbusRtuFramer, ModbusAsciiFramer
# --------------------------------------------------------------------------- #
# import the twisted libraries we need
# --------------------------------------------------------------------------- #
from twisted.internet.task import LoopingCall
# --------------------------------------------------------------------------- #
# configure the service logging
# --------------------------------------------------------------------------- #
import logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
class CustomDataBlock(ModbusSparseDataBlock):
def setValues(self, address, value):
super(CustomDataBlock, self).setValues(address, value)
print("wrote {} to {}".format(value, address))
def setValuesInternal(self, address, value):
ModbusSparseDataBlock.setValues(self, address, value)
def updating_writer(context):
print("value before update: " + str(context[0].getValues(4, 1, 1)))
context[0].store['d'].setValuesInternal(2, [251]) #update Datastore with Serial values
print("value after update: " + str(context[0].getValues(4, 1, 1)))
def run_updating_server():
block = CustomDataBlock([0]*378)
store = ModbusSlaveContext(di=block, co=block, hr=block, ir=block)
context = ModbusServerContext(slaves=store, single=True)
time = 1 #in seconds
loop = LoopingCall(f=updating_writer, context=(context))
loop.start(time, now=False)
StartTcpServer(context, address=('0.0.0.0', 502))
if __name__ == "__main__":
run_updating_server()