使用R2DBC进行数据访问

R2DBC("Reactive Relational Database Connectivity")是一个由社区驱动的规范努力,旨在使用响应式模式标准化访问SQL数据库。

包层次结构

Spring Framework的R2DBC抽象框架由两个不同的包组成:

使用R2DBC核心类来控制基本的R2DBC处理和错误处理

本节介绍如何使用R2DBC核心类来控制基本的R2DBC处理,包括错误处理。它包括以下主题:

使用 DatabaseClient

DatabaseClient 是 R2DBC 核心包中的中心类。它处理资源的创建和释放,有助于避免常见错误,比如忘记关闭连接。它执行核心 R2DBC 工作流程的基本任务(如语句创建和执行),留给应用代码提供 SQL 并提取结果。 DatabaseClient 类:

  • 运行 SQL 查询

  • 更新语句和存储过程调用

  • 执行 Result 实例的迭代

  • 捕获 R2DBC 异常并将其转换为在 org.springframework.dao 包中定义的通用、更具信息性的异常层次结构。 (请参阅 一致的异常层次结构。)

该客户端具有使用响应式类型进行声明式组合的功能性流畅 API。

当您在代码中使用 DatabaseClient 时,您只需要实现 java.util.function 接口,为它们提供明确定义的契约。给定由 DatabaseClient 类提供的 ConnectionFunction 回调创建一个 Publisher。提取 Row 结果的映射函数也是如此。

您可以通过直接实例化带有 ConnectionFactory 引用的 DatabaseClient,或者在 Spring IoC 容器中配置它并将其作为 bean 引用提供给 DAO,来在 DAO 实现中使用 DatabaseClient

创建 DatabaseClient 对象的最简单方法是通过静态工厂方法,如下所示:

  • Java

  • Kotlin

DatabaseClient client = DatabaseClient.create(connectionFactory);
val client = DatabaseClient.create(connectionFactory)
应始终将 ConnectionFactory 配置为 Spring IoC 容器中的 bean。
DatabaseClient

DatabaseClient.builder() 获取一个 Builder 实例。您可以通过调用以下方法自定义客户端:

  • ….bindMarkers(…):提供特定的 BindMarkersFactory 来配置命名参数到数据库绑定标记的转换。

  • ….executeFunction(…):设置 ExecuteFunction 如何运行 Statement 对象。

  • ….namedParameters(false):禁用命名参数扩展。默认启用。

方言由 BindMarkersFactoryResolverConnectionFactory 解析,通常通过检查 ConnectionFactoryMetadata
您可以让 Spring 自动发现您的 BindMarkersFactory,方法是注册一个实现 org.springframework.r2dbc.core.binding.BindMarkersFactoryResolver$BindMarkerFactoryProvider 的类,通过 META-INF/spring.factoriesBindMarkersFactoryResolver 使用 Spring 的 SpringFactoriesLoader 从类路径中发现绑定标记提供程序实现。

  • H2

  • MariaDB

  • Microsoft SQL Server

  • MySQL

  • Postgres

DEBUG 级别记录。此外,每次执行都会在响应序列中注册一个检查点,以帮助调试。

DatabaseClient 使用示例。这些示例并非是 DatabaseClient 公开的所有功能的详尽列表。有关详细信息,请参阅附带的 javadoc

执行语句

DatabaseClient 提供了运行语句的基本功能。以下示例显示了创建新表所需的最小但完全功能的代码:

  • Java

  • Kotlin

Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
        .then();
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
        .await()

DatabaseClient 设计用于方便、流畅的使用。它在执行规范的每个阶段暴露中间、继续和终端方法。上述示例使用 then() 返回一个完成的 Publisher,一旦查询完成(或者如果 SQL 查询包含多个语句,则为多个语句),它就会完成。

execute(…) 接受 SQL 查询字符串或查询 Supplier<String>,以推迟实际查询的创建直到执行。

查询 (SELECT)

SQL查询可以通过Row对象或受影响行数返回值。根据发出的查询,DatabaseClient可以返回更新的行数或行本身。

以下查询从表中获取idname列:

  • Java

  • Kotlin

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
        .fetch().first();
val first = client.sql("SELECT id, name FROM person")
        .fetch().awaitSingle()

以下查询使用绑定变量:

  • Java

  • Kotlin

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
        .bind("fn", "Joe")
        .fetch().first();
val first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
        .bind("fn", "Joe")
        .fetch().awaitSingle()
fetch()fetch()是一个继续运算符,允许您指定要消耗多少数据。

调用first()返回结果中的第一行并丢弃其余行。您可以使用以下运算符消耗数据:

  • first()返回整个结果的第一行。其Kotlin Coroutine变体对于非空返回值命名为awaitSingle(),如果值是可选的,则为awaitSingleOrNull()

  • one()返回确切的一个结果,如果结果包含更多行则失败。使用Kotlin Coroutines,对于确切的一个值使用awaitOne(),如果值可能为null则使用awaitOneOrNull()

  • all()返回结果的所有行。在使用Kotlin Coroutines时,请使用flow()

  • rowsUpdated()返回受影响的行数(INSERT/UPDATE/DELETE计数)。其Kotlin Coroutine变体命名为awaitRowsUpdated()

如果没有指定进一步的映射细节,查询将以Map的形式返回表格结果,其键是不区分大小写的列名,映射到其列值。

您可以通过提供一个Function<Row, T>来控制结果映射,该函数将为每个Row调用,以便返回任意值(单个值、集合和映射以及对象)。

以下示例提取name列并发出其值:

  • Java

  • Kotlin

Flux<String> names = client.sql("SELECT name FROM person")
        .map(row -> row.get("name", String.class))
        .all();
val names = client.sql("SELECT name FROM person")
        .map{ row: Row -> row.get("name", String.class) }
        .flow()

或者,有一个映射到单个值的快捷方式:

	Flux<String> names = client.sql("SELECT name FROM person")
			.mapValue(String.class)
			.all();

或者您可以将其映射到具有bean属性或记录组件的结果对象:

	// 假设Person上有一个name属性
	Flux<Person> persons = client.sql("SELECT name FROM person")
			.mapProperties(Person.class)
			.all();
null如何处理?
null值。Reactive Streams规范禁止发出 null值。该要求要求在提取函数中正确处理 null。虽然您可以从 Row中获取 null值,但不得发出 null值。您必须将任何 null值包装在对象中(例如,对于单个值使用 Optional),以确保 null值永远不会直接由提取函数返回。

更新 (INSERTUPDATEDELETE) 使用 DatabaseClient

rowsUpdated()来消耗结果。

以下示例显示了一个UPDATE语句,返回更新的行数:

  • Java

  • Kotlin

Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
        .bind("fn", "Joe")
        .fetch().rowsUpdated();
val affectedRows = client.sql("UPDATE person SET first_name = :fn")
        .bind("fn", "Joe")
        .fetch().awaitRowsUpdated()

绑定查询值

典型的应用程序需要使用参数化的SQL语句根据一些输入选择或更新行。这些通常是由WHERE子句约束的SELECT语句或接受输入参数的INSERTUPDATE语句。如果参数没有正确转义,参数化语句会带来SQL注入的风险。DatabaseClient利用了R2DBC的bind API来消除查询参数的SQL注入风险。您可以使用execute(…)操作符提供一个带参数的SQL语句,并将参数绑定到实际的Statement上。然后您的R2DBC驱动程序通过使用准备好的语句和参数替换来运行该语句。

参数绑定支持两种绑定策略:

  • 按索引绑定,使用从零开始的参数索引。

  • 按名称绑定,使用占位符名称。

以下示例展示了查询的参数绑定:

    db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
	    	.bind("id", "joe")
	    	.bind("name", "Joe")
			.bind("age", 34);

或者,您可以传入一个名称和值的映射:

	Map<String, Object> params = new LinkedHashMap<>();
	params.put("id", "joe");
	params.put("name", "Joe");
	params.put("age", 34);
	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bindValues(params);

或者,您可以传入一个具有bean属性或记录组件的参数对象:

	// 假设Person上有id、name、age属性
	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bindProperties(new Person("joe", "Joe", 34);
R2DBC本地绑定标记

R2DBC使用依赖于实际数据库供应商的数据库本地绑定标记。例如,Postgres使用索引标记,如$1$2$n。另一个例子是SQL Server,它使用以@为前缀的命名绑定标记。

这与JDBC不同,JDBC需要?作为绑定标记。在JDBC中,实际的驱动程序会在语句执行过程中将?绑定标记转换为数据库本地标记。

Spring Framework的R2DBC支持允许您使用本地绑定标记或使用:name语法的命名绑定标记。

命名参数支持利用BindMarkersFactory实例在查询执行时将命名参数扩展为本地绑定标记,这使您在各种数据库供应商之间具有一定程度的查询可移植性。

查询预处理器将命名Collection参数展开为一系列绑定标记,以消除基于参数数量的动态查询创建的需求。嵌套对象数组被展开以允许使用(例如)选择列表。

考虑以下查询:

SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))

上述查询可以参数化并如下运行:

  • Java

  • Kotlin

List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann",  50});

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
	    .bind("tuples", tuples);
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
	    .bind("tuples", tuples)
选择列表的使用取决于供应商。

以下示例展示了使用IN谓词的简化变体:

  • Java

  • Kotlin

client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
	    .bind("ages", Arrays.asList(35, 50));
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
	    .bind("ages", arrayOf(35, 50))
R2DBC本身不支持类似集合的值。然而,在Spring的R2DBC支持中,将给定的List展开可以用于命名参数,例如用于IN子句中。然而,插入或更新数组类型的列(例如在Postgres中)需要一个由底层R2DBC驱动程序支持的数组类型:通常是一个Java数组,例如String[]用于更新text[]列。不要将Collection<String>等作为数组参数传递。

语句过滤器

有时您需要在实际运行Statement之前微调选项。为此,可以向DatabaseClient注册一个Statement过滤器(StatementFilterFunction),以拦截和修改语句在执行过程中的行为,如下例所示:

  • Java

  • Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
	    .bind("name", …)
	    .bind("state", …);
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter { s: Statement, next: ExecuteFunction -> next.execute(s.returnGeneratedValues("id")) }
		.bind("name", …)
		.bind("state", …)

DatabaseClient还提供了一个简化的filter(…)重载,接受一个Function<Statement, Statement>

  • Java

  • Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter(statement -> s.returnGeneratedValues("id"));

client.sql("SELECT id, name, state FROM table")
	    .filter(statement -> s.fetchSize(25));
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter { statement -> s.returnGeneratedValues("id") }

client.sql("SELECT id, name, state FROM table")
	    .filter { statement -> s.fetchSize(25) }

StatementFilterFunction实现允许过滤StatementResult对象。

DatabaseClient 最佳实践

DatabaseClient类的实例在配置后是线程安全的。这一点很重要,因为这意味着您可以配置一个DatabaseClient的实例,然后安全地将这个共享引用注入到多个DAO(或存储库)中。DatabaseClient是有状态的,因为它保持对ConnectionFactory的引用,但这种状态不是会话状态。

在使用DatabaseClient类时的常见做法是在Spring配置文件中配置一个ConnectionFactory,然后将这个共享的ConnectionFactory bean依赖注入到DAO类中。在ConnectionFactory的setter方法中创建DatabaseClient。这会导致DAO类如下所示:

  • Java

  • Kotlin

public class R2dbcCorporateEventDao implements CorporateEventDao {

	private DatabaseClient databaseClient;

	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.databaseClient = DatabaseClient.create(connectionFactory);
	}

	// 基于R2DBC的CorporateEventDao方法的实现...
}
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao {

	private val databaseClient = DatabaseClient.create(connectionFactory)

	// 基于R2DBC的CorporateEventDao方法的实现...
}

除了显式配置之外,另一种方法是使用组件扫描和注解支持进行依赖注入。在这种情况下,您可以使用@Component对类进行注解(使其成为组件扫描的候选对象),并使用@AutowiredConnectionFactory的setter方法进行注解。以下示例展示了如何实现:

  • Java

  • Kotlin

@Component (1)
public class R2dbcCorporateEventDao implements CorporateEventDao {

	private DatabaseClient databaseClient;

	@Autowired (2)
	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.databaseClient = DatabaseClient.create(connectionFactory); (3)
	}

	// 基于R2DBC的CorporateEventDao方法的实现...
}
1 使用@Component对类进行注解。
2 使用@AutowiredConnectionFactory的setter方法进行注解。
3 使用ConnectionFactory创建一个新的DatabaseClient
@Component (1)
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao { (2)

	private val databaseClient = DatabaseClient(connectionFactory) (3)

	// 基于R2DBC的CorporateEventDao方法的实现...
}
1 使用@Component对类进行注解。
2 使用构造函数注入ConnectionFactory
3 使用ConnectionFactory创建一个新的DatabaseClient

无论您选择使用上述哪种模板初始化样式(或不使用),每次运行SQL时几乎不需要创建DatabaseClient类的新实例。一旦配置好,DatabaseClient实例是线程安全的。如果您的应用程序访问多个数据库,您可能需要多个DatabaseClient实例,这就需要多个ConnectionFactory,随后需要多个不同配置的DatabaseClient实例。

检索自动生成的键

INSERT语句在向定义了自增列或标识列的表中插入行时可能会生成键。要对生成的列名具有完全控制权,只需注册一个StatementFilterFunction,该函数请求所需列的生成键。

  • Java

  • Kotlin

Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter(statement -> s.returnGeneratedValues("id"))
		.map(row -> row.get("id", Integer.class))
		.first();

// 一旦INSERT语句执行完成,generatedId将发出生成的键
val generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter { statement -> s.returnGeneratedValues("id") }
		.map { row -> row.get("id", Integer.class) }
		.awaitOne()

// 一旦INSERT语句执行完成,generatedId将发出生成的键

控制数据库连接

本节涵盖:

使用ConnectionFactory

Spring通过ConnectionFactory从数据库获取R2DBC连接。 ConnectionFactory是R2DBC规范的一部分,是驱动程序的常见入口点。它允许容器或框架隐藏应用程序代码中的连接池和事务管理问题。作为开发人员,您不需要了解如何连接到数据库的详细信息。这是设置ConnectionFactory的管理员的责任。在开发和测试代码时,您很可能会充当这两种角色,但不一定需要了解生产数据源的配置方式。

当您使用Spring的R2DBC层时,可以使用第三方提供的连接池实现来配置自己的连接池。一个常用的实现是R2DBC Pool(r2dbc-pool)。Spring分发中的实现仅用于测试目的,不提供连接池。

要配置ConnectionFactory

  1. 像通常获取R2DBC ConnectionFactory一样获取连接。

  2. 提供一个R2DBC URL(请参阅您的驱动程序文档以获取正确的值)。

以下示例显示了如何配置ConnectionFactory

  • Java

  • Kotlin

ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
val factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");

使用ConnectionFactoryUtils

ConnectionFactoryUtils类是一个方便且强大的助手类,提供了从ConnectionFactory获取连接和关闭连接(如果需要)的static方法。

它支持订阅者Context绑定的连接,例如R2dbcTransactionManager

使用SingleConnectionFactory

SingleConnectionFactory类是DelegatingConnectionFactory接口的实现,它包装一个不会在每次使用后关闭的单个Connection

如果任何客户端代码调用close,假设是一个连接池连接(例如在使用持久性工具时),您应将suppressClose属性设置为true。此设置返回一个包装物理连接的关闭抑制代理。请注意,您不能再将其转换为本机Connection或类似对象。

SingleConnectionFactory主要是一个测试类,可用于特定要求,例如如果您的R2DBC驱动程序允许进行流水线处理。与连接池ConnectionFactory相比,它始终重用相同的连接,避免过度创建物理连接。

使用TransactionAwareConnectionFactoryProxy

TransactionAwareConnectionFactoryProxy是目标ConnectionFactory的代理。该代理包装目标ConnectionFactory,以增加对Spring管理事务的意识。

如果您使用未与Spring的R2DBC支持集成的R2DBC客户端,则需要使用此类。在这种情况下,您仍然可以使用此客户端,并且同时使该客户端参与Spring管理的事务。通常最好将R2DBC客户端与具有适当访问ConnectionFactoryUtils以进行资源管理的集成。

有关更多详细信息,请参阅TransactionAwareConnectionFactoryProxy javadoc。

使用R2dbcTransactionManager

R2dbcTransactionManager类是单个R2DBC ConnectionFactoryReactiveTransactionManager实现。它将来自指定ConnectionFactory的R2DBC Connection绑定到订阅者Context,可能允许每个ConnectionFactory一个订阅者Connection

应用程序代码需要通过ConnectionFactoryUtils.getConnection(ConnectionFactory)检索R2DBC Connection,而不是使用R2DBC的标准ConnectionFactory.create()。所有框架类(例如DatabaseClient)都隐式使用此策略。如果不使用事务管理器,则查找策略的行为与ConnectionFactory.create()完全相同,因此可以在任何情况下使用。