Skip to main content

Message Format

CloudCanal supports selecting the synchronized message store format with MQ, this article introduces the definition and description of multiple message formats of MQ, which facilitates downstream consumption and use.

Message format introduction

Supported message synchronization formats

  • CloudCanal Json: CloudCanal's default message format, parsing incremental logs from the database and transmitting to Kafka, supporting batch message transmission.
  • Canal Json: A compatible format for Canal, data stored in the Canal Json format.
  • Aliyun DTS Avro: An data serialization format that can transform data structures or objects into a format suitable for storage or transmission.
  • Debezium Envelope: Debezium official CDC message format, carrying SCHEMA information, friendly to large data downstream consumption.

Target end MQ support

Message formatKafkaRocketMQRabbitMQ
CloudCanal JsonSupportedSupportedSupported
Canal JsonSupportedSupportedSupported
Aliyun DTS AvroSupported--
Debezium EnvelopeSupported--

Source end MQ support

Message formatKafkaRocketMQRabbitMQ
CloudCanal JsonSupportedSupportedSupported
Canal JsonSupportedSupportedSupported
Aliyun DTS Avro---
Debezium Envelope---

Message format details

CloudCanal Json

Parameter description:

ParameterTypeDescription
actionStringThe type of operation, such as: INSERT / UPDATE / DELETE.
bidLongBatchEventBuffer's Batch Id.
beforeListThe data before the change.
dataListThe current operation data.
dbStringDatabase name.
schemaStringSCHEMA name.
tableStringTable name.
dbValTypeMapThe field data type name.
jdbcTypeMapThe field JDBC data type.
entryTypeStringThe source event type, such as: ROWDATA / TRANSACTIONEND.
isDdlBooleanWhether it is a DDL operation.
pksListThe primary key names of the source end.
execTsLongThe time of source end SQL execution, a 13-digit Unix timestamp in milliseconds.
sendTsLongThe time of operation transmission, 13 digit Unix timestamp in milliseconds.
sqlStringThe DDL statement executed at the source end.
tableChangesJsonWhen this message is a DDL, it carries the metadata of this table, such as: primary key, column.

DML operation example is as follows:

{
"action":"INSERT/DELETE/UPDATE",
"before":[
// The before field of an UPDATE statement.
{
"col1":"22",
"col2":"22",
"col_pk":"22"
}
],
"bid":0,
"data":[
{
"col1":"11",
"col2":"11",
"col_pk":"11"
}
],
"db":"db_test",
"dbValType":{
"col1":"varchar(22)",
"col2":"varchar(22)",
"col_pk":"varchar(22)"
},
"isDdl":false,
"entryType":"ROWDATA",
"execTs":1669789152000,
"jdbcType":{
"col1":12,
"col2":12,
"col_pk":12
},
"pks":[],
"schema":"db_test",
"sendTs":1669789153377,
"sql":"",
"table":"table_test"
}

DDL operation example is as follows:

{
"action":"ALTER",
"before":[],
"bid":0,
"data":[],
"db":"db_test",
"dbValType":{
"col1":"varchar(22)",
"col2":"varchar(22)",
"col_pk":"varchar(22)"
},
"isDdl":true,
"entryType":"ROWDATA",
"execTs":1669789188000,
"jdbcType":{
"col1":12,
"col2":12,
"col_pk":12
},
"pks":[],
"schema":"db_test",
"sendTs":1669789189533,
"sql":"alter table table_test add col2 varchar(22) null",
"table":"table_test",
"tableChanges":{
"table":{
"columns":[
{
"jdbcType":12, // Jdbc type.
"name":"col1", // Column name.
"position":0, // Column order.
"typeExpression":"varchar(22)", // Type annotation.
"typeName":"varchar" // Type name.
},
{
"jdbcType":12,
"name":"col2",
"position":1,
"typeExpression":"varchar(22)",
"typeName":"varchar"
},
{
"jdbcType":12,
"name":"col_pk",
"position":2,
"typeExpression":"varchar(22)",
"typeName":"varchar"
}
],
"primaryKeyColumnNames":["col_pk"] // The name list of primary key.
},
"type":"ALTER"
}
}

Canal Json

Parameter description:

ParameterTypeDescription
typeStringThe operation type, e.g.: INSERT / UPDATE / DELETE.
idLongThe sequence number of the operation.
oldListThe data before the change.
dataListThe data of the current operation.
databaseStringThe database name.
tableStringThe table name.
mysqlTypeMapThe field data type names.
sqlTypeMapThe field JDBC data types.
isDdlBooleanWhether it is a DDL operation.
pkNamesListThe primary key names of the source end.
esLongThe time of the SQL execution at the source end, Unix timestamp in milliseconds.
tsLongThe time when the operation was sent, Unix timestamp in milliseconds.
sqlStringThe DDL statement executed at the source end.
tableChangesJsonWhen the message is a DDL, it carries the meta information of the table, such as primary key and columns.

DML operation example is as follows:

{
"data":[
{
"col1":"11",
"col2":"11",
"col_pk":"11"
}
],
"database":"db_test",
"es":1669790847000,
"id":0,
"isDdl":false,
"mysqlType":{
"col1":"varchar(22)",
"col2":"varchar(22)",
"col_pk":"varchar(22)"
},
"old":[
// The old field for an UPDATE type.
{
"col1":"22",
"col2":"22",
"col_pk":"22"
}
],
"pkNames":["col_pk"],
"sql":"",
"sqlType":{
"col1":12,
"col2":12,
"col_pk":12
},
"table":"table_test",
"ts":1669790848072,
"type":"INSERT/DELETE/UPDATE"
}

DDL operation example is as follows:

{
"data":[],
"database":"db_test",
"es":1669790951000,
"id":0,
"isDdl":true,
"mysqlType":{
"col1":"varchar(22)",
"col2":"varchar(22)",
"col_pk":"varchar(22)"
},
"old":[],
"pkNames":[],
"sql":"alter table table_test add col2 varchar(22) null",
"sqlType":{
"col1":12,
"col2":12,
"col_pk":12
},
"table":"table_test",
"tableChanges":{
"table":{
"columns":[
{
"jdbcType":12, // Jdbc type.
"name":"col1", // Column name.
"position":0, // Column order.
"typeExpression":"varchar(22)", // Type annotation.
"typeName":"varchar" // Type name.
},
{
"jdbcType":12,
"name":"col2",
"position":1,
"typeExpression":"varchar(22)",
"typeName":"varchar"
},
{
"jdbcType":12,
"name":"col_pk",
"position":2,
"typeExpression":"varchar(22)",
"typeName":"varchar"
}
],
"primaryKeyColumnNames":["col_pk"] // The name list of primary key.
},
"type":"ALTER"
},
"ts":1669790952584,
"type":"ALTER"
}

Alibaba DTS Avro

This message type requires data parsing according to the SCHEMA definition of DTS Avro. Refer to the DTS Avro's SCHEMA definition for more information on DTS Avro.

Debezium Envelope

This message type mainly consists of SCHEMA and PAYLOAD, the SCHEMA is the metadata of the data, and the PAYLOAD contains the content that changes the records.
Refer to the Official Debezium documentation for details on the SCHEMA definition.

Parameter description:

ParameterTypeDescription
opStringThe type of operation, e.g.: c(INSERT), u(UPDATE), d(DELETE), a(ALTER).
ts_msLongThe time the operation was sent, a 13 digit Unix timestamp in milliseconds.
afterJsonThe data before the change.
beforeJsonThe data after the change.
sourceJsonMeta information of the event, e.g.: db,table.
ddlStringThe DDL statement executed at the source.
tableChangesJsonThe table metadata like: primary key, columns, carried by the message in case of DDL.

DML operation example is as follows:

{
"schema":...,
"payload":{
"op":"i",
"ts_ms":1669796261933,
"after":{
"col1":"11",
"col2":"11",
"col_pk":"11"
},
"before":{},
"source":{
"ts_ms":1669796261933,
"db":"db_test",
"table":"table_test",
"connector":"MySQL",
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"server_id": 223344,
...
}
}
}

DDL operation example is as follows:

{
"schema":...,
"payload":{
"databaseName":"db_test",
"ddl":"alter table table_test add col2 varchar(22) null",
"ts_ms":1669797213247,
"source":{
"ts_ms":1669796261933,
"db":"db_test",
"table":"table_test",
"connector":"MySQL",
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"server_id": 223344,
...
},
"tableChanges":{
"type":"ALTER",
"table":{
"columns":[
{
"jdbcType":12, // Jdbc type.
"name":"col1", // Column name.
"position":0, // Column order.
"typeExpression":"varchar(22)", // Type annotation.
"typeName":"varchar" // Type name.
},
{
"jdbcType":12,
"name":"col2",
"position":1,
"typeExpression":"varchar(22)",
"typeName":"varchar"
},
{
"jdbcType":12,
"name":"col_pk",
"position":2,
"typeExpression":"varchar(22)",
"typeName":"varchar"
}
],
"primaryKeyColumnNames":["col_pk"], // The name list of primary key.
}
}
}
}