Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
osie
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
osie
Commits
d99523d1
Commit
d99523d1
authored
Jun 17, 2021
by
Ivan Tyagov
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Not needed.
parent
d3ae3dba
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
0 additions
and
96 deletions
+0
-96
eggs/osie_coupler/custom_message.py
eggs/osie_coupler/custom_message.py
+0
-96
No files found.
eggs/osie_coupler/custom_message.py
deleted
100644 → 0
View file @
d3ae3dba
#!/usr/bin/env python
"""
Pymodbus Synchronous Client Examples
--------------------------------------------------------------------------
The following is an example of how to use the synchronous modbus client
implementation from pymodbus.
It should be noted that the client can also be used with
the guard construct that is available in python 2.5 and up::
with ModbusClient('127.0.0.1') as client:
result = client.read_coils(1,10)
print result
"""
import
struct
# --------------------------------------------------------------------------- #
# import the various server implementations
# --------------------------------------------------------------------------- #
from
pymodbus.pdu
import
ModbusRequest
,
ModbusResponse
,
ModbusExceptions
from
pymodbus.client.sync
import
ModbusTcpClient
as
ModbusClient
from
pymodbus.bit_read_message
import
ReadCoilsRequest
from
pymodbus.compat
import
int2byte
,
byte2int
# --------------------------------------------------------------------------- #
# configure the client logging
# --------------------------------------------------------------------------- #
import
logging
logging
.
basicConfig
()
log
=
logging
.
getLogger
()
log
.
setLevel
(
logging
.
DEBUG
)
# --------------------------------------------------------------------------- #
# create your custom message
# --------------------------------------------------------------------------- #
# The following is simply a read coil request that always reads 16 coils.
# Since the function code is already registered with the decoder factory,
# this will be decoded as a read coil response. If you implement a new
# method that is not currently implemented, you must register the request
# and response with a ClientDecoder factory.
# --------------------------------------------------------------------------- #
class
CustomModbusResponse
(
ModbusResponse
):
function_code
=
55
_rtu_byte_count_pos
=
2
def
__init__
(
self
,
values
=
None
,
**
kwargs
):
ModbusResponse
.
__init__
(
self
,
**
kwargs
)
self
.
values
=
values
or
[]
def
encode
(
self
):
""" Encodes response pdu
:returns: The encoded packet message
"""
result
=
int2byte
(
len
(
self
.
values
)
*
2
)
for
register
in
self
.
values
:
result
+=
struct
.
pack
(
'>H'
,
register
)
return
result
def
decode
(
self
,
data
):
""" Decodes response pdu
:param data: The packet data to decode
"""
byte_count
=
byte2int
(
data
[
0
])
self
.
values
=
[]
for
i
in
range
(
1
,
byte_count
+
1
,
2
):
self
.
values
.
append
(
struct
.
unpack
(
'>H'
,
data
[
i
:
i
+
2
])[
0
])
class
CustomModbusRequest
(
ModbusRequest
):
function_code
=
55
_rtu_frame_size
=
8
def
__init__
(
self
,
address
=
None
,
**
kwargs
):
ModbusRequest
.
__init__
(
self
,
**
kwargs
)
self
.
address
=
address
self
.
count
=
16
def
encode
(
self
):
return
struct
.
pack
(
'>HH'
,
self
.
address
,
self
.
count
)
def
decode
(
self
,
data
):
self
.
address
,
self
.
count
=
struct
.
unpack
(
'>HH'
,
data
)
def
execute
(
self
,
context
):
if
not
(
1
<=
self
.
count
<=
0x7d0
):
return
self
.
doException
(
ModbusExceptions
.
IllegalValue
)
if
not
context
.
validate
(
self
.
function_code
,
self
.
address
,
self
.
count
):
return
self
.
doException
(
ModbusExceptions
.
IllegalAddress
)
values
=
context
.
getValues
(
self
.
function_code
,
self
.
address
,
self
.
count
)
return
CustomModbusResponse
(
values
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment