注:图片建议在 PC 端查看
插入和删除 undo log 的逻辑都比较简单,直接操作数据表就行。这里重点看下回滚 undo log 的逻辑:
源码分析如下:
@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;b
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;
for (; ; ) {
try {
conn = dataSourceProxy.getPlainConnection();
// The entire undo process should run in a local transaction.
// 开启本地事务,确保删除undo log和恢复业务数据的SQL在一个事务中commit
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
// Find UNDO LOG
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
// 查出branchId的所有undo log记录,用来恢复业务数据
rs = selectPST.executeQuery();
boolean exists = false;
while (rs.next()) {
exists = true;
// It is possible that the server repeatedly sends a rollback request to roll back
// the same branch transaction to multiple processes,
// ensuring that only the undo_log in the normal state is processed.
int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
// 如果state=1,说明可以回滚;state=1说明不能回滚
if (!canUndo(state)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
}
return;
}
String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = parseContext(contextString);
byte[] rollbackInfo = getRollbackInfo(rs);
String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
// 根据serializer获取序列化工具类
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
: UndoLogParserFactory.getInstance(serializer);
// 反序列化undo log,得到业务记录修改前后的明文
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try {
// put serializer name to local
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
sqlUndoLog.setTableMeta(tableMeta);
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
dataSourceProxy.getDbType(), sqlUndoLog);
undoExecutor.executeOn(conn);
}
} finally {
// remove serializer name
removeCurrentSerializer();
}
}
// If undo_log exists, it means that the branch transaction has completed the first phase,
// we can directly roll back and clean the undo_log
// Otherwise, it indicates that there is an exception in the branch transaction,
// causing undo_log not to be written to the database.
// For example, the business processing timeout, the global transaction is the initiator rolls back.
// To ensure data consistency, we can insert an undo_log with GlobalFinished state
// to prevent the local transaction of the first phase of other programs from being correctly submitted.
// See https://github.com/seata/seata/issues/489
if (exists) {
deleteUndoLog(xid, branchId, conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
State.GlobalFinished.name());
}
} else {
// 如果不存在undo log,可能是因为分支事务还未执行完成(比如,分支事务执行超时),TM发起了回滚全局事务的请求。
// 这个时候,往undo_log表插入一条记录,可以使分支事务提交的时候失败(undo log)
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
State.GlobalFinished.name());
}
}
return;
} catch (SQLIntegrityConstraintViolationException e) {
// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
}
} catch (Throwable e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
}
}
throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
branchId, e.getMessage()), e);
} finally {
try {
if (rs != null) {
rs.close();
}
if (selectPST != null) {
selectPST.close();
}
if (conn != null) {
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
conn.close();
}
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
}
}
}
}
public interface UndoLogParser {
/**
* Get the name of parser;
*
* @return the name of parser
*/
String getName();
/**
* Get default context of this parser
*
* @return the default content if undo log is empty
*/
byte[] getDefaultContent();
/**
* Encode branch undo log to byte array.
*
* @param branchUndoLog the branch undo log
* @return the byte array
*/
byte[] encode(BranchUndoLog branchUndoLog);
/**
* Decode byte array to branch undo log.
*
* @param bytes the byte array
* @return the branch undo log
*/
BranchUndoLog decode(byte[] bytes);
}
public interface Executor<T> { /** * Execute t. * * @param args the args * @return the t * @throws Throwable the throwable */ T execute(Object... args) throws Throwable;}
注:图片建议在 PC 端查看
为了直观地看到不同类型的 SQL 生成的 before image SQL 和 after iamge SQL ,这里做个梳理。假如目标数据表的结构如下:
public interface Executor<T> {
/**
* Execute t.
*
* @param args the args
* @return the t
* @throws Throwable the throwable
*/
T execute(Object... args) throws Throwable;
}
注:图片建议在 PC 端查看 5.6 AsyncWorker
AsyncWorker 是用来做异步执行的,用来做分支事务提交和 undo log 记录删除等操作。
6、关于性能
并不存在某一种完美的分布式事务机制可以适应所有场景,完美满足所有需求。无论 AT 模式、TCC 模式还是 Saga 模式,本质上都是对 XA 规范在各种场景下安全性或者性能的不足的改进。Seata 不同的事务模式是在一致性、可靠性、易用性、性能四个特性之间进行不同的取舍。
近期 Seata 社区发现有同行,在未详细分析 Java 版本 AT 模式的代码的详细实现的情况下,仅对某个早期的 Go 版本的 Seata 进行短链接压测后,质疑 AT 模型的性能及其数据安全性,请具有一定思辨能力的用户朋友们在接受这个结论前仔细查阅其测试方法与测试对象,区分好 “李鬼” 与 “李逵”。
实际上,这个早期的 Go 版本实现仅参照了 Seata v1.4.0,且未严格把 Seata AT 模式的所有功能都予以实现。话说回来,即便其推崇的 Seata XA 模式,其也依赖于单 DB 的XA 模式。而当下最新版本的 MySQL XA 事务模式的 BUG 依然很多,这个地基并没有其想象中的那样百分百稳固。
由阿里与蚂蚁集团共建的 Seata,是我们多年内部分布式事务工程实践与技术经验的结晶,开源出来后得到了多达 150+ 以上行业同行生产环境的验证。开源大道既长且宽,这个道路上可以有机动车道也有非机动车道,还可以有人行道,大家携手把道路拓宽延长,而非站在人行道上宣传机动车道危险性高且车速慢。 7、总结
Seata AT 模式依赖于各个 DB 厂商的不同版本的 DB Driver(数据库驱动),每种数据库发布新版本后,其 SQL 语义及其使用模式都可能发生改变。随着近年 Seata 被其用户们广泛应用于多种业务场景,在开发者们的努力下,Seata AT 模式保持了编程接口与其 XA 模式几乎一致,适配了几乎所有的主流数据库,并覆盖了这些数据库的主要流行版本的 Driver:真正做到了把分布式系统的 “复杂性”留在了框架层面,把易用性和高性能交给了用户。
当然,Seata Java 版本的 XA 和 AT 模式还有许多需要完善与改进的地方,遑论其它多语言版本的实现。欢迎对 Seata 及其多语言版本建设感兴趣的同行参与到 Seata 的建设中来,共同努力把 Seata 打造成一个标准化分布式事务平台。 本周推荐阅读
Go内存泄漏,pprof 够用了么?
Go 原生插件使用问题全解析
Go 代码城市上云--KusionStack 实践
Seata-php 半年规划