receiveFromOpcUa
Source processor that reads tags (value and status) from an OPC UA server.
The processor can operate either in polling or subscription mode. The mode is chosen by either setting one of the poll*
properties or the subscriptionRequestedPublishingIntervalMillis
property.
Each tag is identified by a namespace and ID. The consumer may be configured to poll the OPC UA server at a regular interval or to subscribe to changes on specific tags. Multiple tags (i.e., namespace and ID) may be specified.
The source is a singleton. To set up multiple consumers, multiple flows are required. This is useful if you want to support a large number of tags by having multiple consumers process a sub-set each. If you have a large amount (i.e., thousands or more) of tags to monitor, shard them across multiple flows. For example, clone a flow and give each of the clones a thousand tags to monitor.
The values read from the OPC UA server are not part of a transaction. They are delivered on a best-effort basis, meaning that you can get readings from one tag but not another in the same polling round.
Properties
Name | Summary |
---|---|
|
Adds a string of format
|
|
Adds each line of a string (separated by
For example: 2:HelloWorld/ScalarTypes/Int16:String 2:HelloWorld/ScalarTypes/Int32:String 2:HelloWorld/ScalarTypes/UtcTime:String |
|
Key from the server configuration used to look up the credentials needed to connect to the OPC UA server. Optional. |
|
The connection URL for the OPC UA server. |
|
The fixed rate poll interval (in milliseconds) to use when polling the OPC UA server for new tag values. The data is polled immediately on startup and then precisely at the rate specified. This, |
|
The fixed delay poll interval (in milliseconds) to use when polling the OPC UA server for new tag values. The data is polled immediately on startup and then with the given delay added after each processing has completed. This, |
|
The poll interval (written as a Quartz Cron Trigger) to use when polling the OPC UA server for new tag values. The polling will happen at the specified times described by the cron expression. This, |
|
Requested interval (in milliseconds) in which the subscription on the OPC UA server should publish notifications to this consumer. Publishing will happen if there was sampling since the last message was published, and the monitored value changed. The server might publish at a different interval if the requested interval cannot be used (i.e., the interval is too short). For example, the requested publishing interval could be set to
If the requested interval is less than or equal to 0, the OPC UA server will use the fastest supported publishing interval. Setting this to If this property is set, none of the |
|
Optional interval in which data should be sampled from a tag by the OPC UA server. A value of Please note that this only sets the sampling rate. The rate at which it is published back to this consumer is set by the property If this property is set, none of the |
|
Length of the queue that will keep sampled values from a tag. Rate of sampling is set by Optional, but if not set (or set to If this property is set, none of the |
|
The maximum number of notifications that can be in one publish response to this consumer. If more tag values are changed during the publishing interval, they are batched together. A value of If not set, the default value of |
|
The request timeout (in milliseconds) to use when polling the OPC UA server for new tag values. |
|
Whether the consumer should actually be polling messages from the OPC UA server. Defaults to |
|
Optional, descriptive name for the processor. |
|
Required identifier of the processor, unique across all processors within the flow. Must be between 3 and 30 characters long; contain only lower and uppercase alphabetical characters (a-z and A-Z), numbers, dashes ("-"), and underscores ("_"); and start with an alphabetical character. In other words, it adheres to the regex pattern |
|
Optional set of custom properties in a simple jdk-format, that are added to the message exchange properties before processing the incoming payload. Any existing properties with the same name will be replaced by properties defined here. |
|
Whether the incoming payload is available for error processing on failure. Defaults to |
Sub-builders
Name | Summary |
---|---|
Strategy for describing the external system integration. Optional. |
|
Strategy for describing how a processor’s message is logged on the server. |
|
Strategy for archiving payloads. |
|
Strategy that customizes the conversion of an incoming payload by a processor (e.g., string to object). Should be used when the processor’s default conversion logic cannot be used. |
Details
Output Payload
Messages will be available to the flow in the form of a JSON compliant object described by the following data classes:
data class OpcTagReading(
val consumerReadTimeUtc: String,
val opcValues: List<OpcValue>
)
data class OpcValue(
val opcTag: OpcTag,
val value: String?,
val quality: QualityType,
val serverTimeUtc: String?,
val sourceTimeUtc: String?
)
data class OpcTag(
val namespace: Int,
val id: String
)
enum class QualityType { Good, Uncertain, Bad }
An example payload would look like the following:
{
"consumerReadTimeUtc": "2022-11-20T20:12:26.742563Z",
"opcValues": [
{
"opcTag": {
"namespace": 2,
"id": "HelloWorld/ScalarTypes/Int64"
},
"value": "64",
"quality": "Good",
"serverTimeUtc": "2022-11-20T20:12:26.739Z",
"sourceTimeUtc": "2022-11-20T20:12:21.511Z"
}
]
}
Authentication
The authenticationConfigKey
property supports secrets of type UserNameAndPassword.
See the Secret Types documentation for formatting details.
The processor does not support OPC UA security certificates.