receiveFromOpcUa

Source processor that reads tags (value and status) from an OPC UA server.

The processor can operate either in polling or subscription mode. The mode is chosen by either setting one of the poll* properties or the subscriptionRequestedPublishingIntervalMillis property.

Each tag is identified by a namespace and ID. The consumer may be configured to poll the OPC UA server at a regular interval or to subscribe to changes on specific tags. Multiple tags (i.e., namespace and ID) may be specified.

The source is a singleton. To set up multiple consumers, multiple flows are required. This is useful if you want to support a large number of tags by having multiple consumers process a sub-set each. If you have a large amount (i.e., thousands or more) of tags to monitor, shard them across multiple flows. For example, clone a flow and give each of the clones a thousand tags to monitor.

The values read from the OPC UA server are not part of a transaction. They are delivered on a best-effort basis, meaning that you can get readings from one tag but not another in the same polling round.

Properties

Name Summary

opcTag()

Adds a string of format namespace:identifier:identifierType to a list of OPC tags to be read. Values would be the following:

  • namespace: Tag namespace as an Int.

  • identifier: Tag identifier as a String.

  • identifierType: Tag identifier type as an enum ( Int, UInteger, String, UUID, or ByteString).

opcTagList()

Adds each line of a string (separated by \n) to the list of OPC tags to be read. Lines should be in the format namespace:identifier:identifierType, where the values adhere to the following:

  • namespace: Tag namespace as an Int.

  • identifier: Tag identifier as a String.

  • identifierType: Tag identifier type as an enum ( Int, UInteger, String, UUID, or ByteString).

For example:

2:HelloWorld/ScalarTypes/Int16:String
2:HelloWorld/ScalarTypes/Int32:String
2:HelloWorld/ScalarTypes/UtcTime:String

authenticationConfigKey

Key from the server configuration used to look up the credentials needed to connect to the OPC UA server. Optional.

url

The connection URL for the OPC UA server.

pollFixedRateMillis

The fixed rate poll interval (in milliseconds) to use when polling the OPC UA server for new tag values. The data is polled immediately on startup and then precisely at the rate specified. This, pollFixedDelayMillis, or pollCronExpression must be set for polling. Otherwise, one of the subscription properties must be set.

pollFixedDelayMillis

The fixed delay poll interval (in milliseconds) to use when polling the OPC UA server for new tag values. The data is polled immediately on startup and then with the given delay added after each processing has completed. This, pollFixedRateMillis, or pollCronExpression must be set for polling. Otherwise, one of the subscription properties must be set.

pollCronExpression

The poll interval (written as a Quartz Cron Trigger) to use when polling the OPC UA server for new tag values. The polling will happen at the specified times described by the cron expression. This, pollFixedRateMillis, or pollFixedDelayMillis must be set for polling. Otherwise, one of the subscription properties must be set.

subscriptionRequestedPublishingIntervalMillis

Requested interval (in milliseconds) in which the subscription on the OPC UA server should publish notifications to this consumer. Publishing will happen if there was sampling since the last message was published, and the monitored value changed. The server might publish at a different interval if the requested interval cannot be used (i.e., the interval is too short).

For example, the requested publishing interval could be set to 60_000, the sampling interval set to 10_000, and the queue size set to 6. The OPC UA server will pull a sample every 10 seconds and put the results on the queue of size

  1. Every minute, the results of up to 6 values are published to this consumer (or fewer if there was no change).

If the requested interval is less than or equal to 0, the OPC UA server will use the fastest supported publishing interval. Setting this to null will use the OPC UA server’s default setting.

If this property is set, none of the poll* properties must be set.

subscriptionSamplingIntervalMillis

Optional interval in which data should be sampled from a tag by the OPC UA server. A value of 0 indicates that the server should use the fastest practical rate.

Please note that this only sets the sampling rate. The rate at which it is published back to this consumer is set by the property subscriptionRequestedPublishingIntervalMillis (which is required for subscription mode).

If this property is set, none of the poll* properties must be set.

subscriptionQueueSize

Length of the queue that will keep sampled values from a tag. Rate of sampling is set by subscriptionSamplingIntervalMillis. If there are more values sampled than the queue length before subscriptionRequestedPublishingIntervalMillis triggers this consumer, the oldest values will start getting dropped from the queue.

Optional, but if not set (or set to 0), the default queue size of 1 is used, effectively disabling queueing.

If this property is set, none of the poll* properties must be set.

subscriptionMaxNotificationsPerPublish

The maximum number of notifications that can be in one publish response to this consumer. If more tag values are changed during the publishing interval, they are batched together. A value of 0 means there is no limit. 1 effectively disables batching.

If not set, the default value of 65535 is used. If this property is set, none of the poll* properties must be set.

requestTimeoutMillis

The request timeout (in milliseconds) to use when polling the OPC UA server for new tag values.

enabled

Whether the consumer should actually be polling messages from the OPC UA server. Defaults to true. Can be used to "pause" active consumers by updating the flow with this property disabled.

name

Optional, descriptive name for the processor.

id

Required identifier of the processor, unique across all processors within the flow. Must be between 3 and 30 characters long; contain only lower and uppercase alphabetical characters (a-z and A-Z), numbers, dashes ("-"), and underscores ("_"); and start with an alphabetical character. In other words, it adheres to the regex pattern [a-zA-Z][a-zA-Z0-9_-]{2,29}.

exchangeProperties

Optional set of custom properties in a simple jdk-format, that are added to the message exchange properties before processing the incoming payload. Any existing properties with the same name will be replaced by properties defined here.

retainPayloadOnFailure

Whether the incoming payload is available for error processing on failure. Defaults to false.

Sub-builders

Name Summary

externalSystemDetails

Strategy for describing the external system integration. Optional.

messageLoggingStrategy

Strategy for describing how a processor’s message is logged on the server.

payloadArchivingStrategy

Strategy for archiving payloads.

inboundTransformationStrategy

Strategy that customizes the conversion of an incoming payload by a processor (e.g., string to object). Should be used when the processor’s default conversion logic cannot be used.

Details

Output Payload

Messages will be available to the flow in the form of a JSON compliant object described by the following data classes:

data class OpcTagReading(
    val consumerReadTimeUtc: String,
    val opcValues: List<OpcValue>
)

data class OpcValue(
    val opcTag: OpcTag,
    val value: String?,
    val quality: QualityType,
    val serverTimeUtc: String?,
    val sourceTimeUtc: String?
)

data class OpcTag(
    val namespace: Int,
    val id: String
)

enum class QualityType { Good, Uncertain, Bad }

An example payload would look like the following:

{
  "consumerReadTimeUtc": "2022-11-20T20:12:26.742563Z",
  "opcValues": [
    {
      "opcTag": {
        "namespace": 2,
        "id": "HelloWorld/ScalarTypes/Int64"
      },
      "value": "64",
      "quality": "Good",
      "serverTimeUtc": "2022-11-20T20:12:26.739Z",
      "sourceTimeUtc": "2022-11-20T20:12:21.511Z"
    }
  ]
}

Authentication

The authenticationConfigKey property supports secrets of type UserNameAndPassword. See the Secret Types documentation for formatting details.

The processor does not support OPC UA security certificates.