Kafka Connect ExtractTimeStamp SMT Usage Reference for Confluent Cloudï
The ExtractTimeStamp Single Message Transform (SMT) is used to extract a specific time value from a field within a Kafka recordâs key or value and promote that value to the recordâs official timestamp metadata.
Note
The ExtractTimeStamp SMT is available only for managed Sink connectors.
To apply the ExtractTimeStamp SMT, add the following to your connector configuration.
{
"transforms" : "extractTimestamp",
"transforms.extractTimestamp.type" : "com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value",
"transforms.extractTimestamp.field.name" : "<TIMESTAMP_FIELD_NAME>"
}
Update the <TIMESTAMP_FIELD_NAME> placeholder to the name of the field containing the recordâs event time.
Examplesï
The example below shows how to use ExtractTimeStamp SMT:
Input:
{ "topic" : "topic", "kafkaPartition" : 1, "value" : { "timestamp" : 1512164613123 }, "timestampType" : "NO_TIMESTAMP_TYPE", "offset" : 1, "headers" : [ ] }
Adding the SMT to your connector configuration:
To apply the
ExtractTimeStampSMT ontimestampfield, add the following to your connector configuration:{ "transforms" : "extractTimestamp", "transforms.extractTimestamp.type" : "com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value", "transforms.extractTimestamp.field.name" : "timestamp" }
Output:
After the
ExtractTimeStampSMT applies, the value transforms as follows:{ "topic" : "topic", "kafkaPartition" : 1, "value" : { "timestamp" : 1512164613123 }, "timestamp" : 1512164613123, "timestampType" : "NO_TIMESTAMP_TYPE", "offset" : 1, "headers" : [ ] }
Propertiesï
Name | Description | Type | Default | Valid Values | Importance |
|---|---|---|---|---|---|
| Specifies the field in the record that contains the timestamp value. The fieldâs value must be a timestamp or a 64-bit integer (int64). | STRING | HIGH |
Predicatesï
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Cloud, predicates can conditionally filter out specific records. For details and examples, see Predicates.