使用R2DBC进行数据访问
R2DBC("Reactive Relational Database Connectivity")是一个由社区驱动的规范努力,旨在使用响应式模式标准化访问SQL数据库。
包层次结构
Spring Framework的R2DBC抽象框架由两个不同的包组成:
-
core
:org.springframework.r2dbc.core
包含DatabaseClient
类以及各种相关类。请参阅使用R2DBC核心类控制基本的R2DBC处理和错误处理。 -
connection
:org.springframework.r2dbc.connection
包含一个用于轻松访问ConnectionFactory
的实用类,以及各种简单的ConnectionFactory
实现,您可以用于测试和运行未修改的R2DBC。请参阅控制数据库连接。
使用R2DBC核心类来控制基本的R2DBC处理和错误处理
本节介绍如何使用R2DBC核心类来控制基本的R2DBC处理,包括错误处理。它包括以下主题:
使用 DatabaseClient
DatabaseClient
是 R2DBC 核心包中的中心类。它处理资源的创建和释放,有助于避免常见错误,比如忘记关闭连接。它执行核心 R2DBC 工作流程的基本任务(如语句创建和执行),留给应用代码提供 SQL 并提取结果。 DatabaseClient
类:
-
运行 SQL 查询
-
更新语句和存储过程调用
-
执行
Result
实例的迭代 -
捕获 R2DBC 异常并将其转换为在
org.springframework.dao
包中定义的通用、更具信息性的异常层次结构。 (请参阅 一致的异常层次结构。)
该客户端具有使用响应式类型进行声明式组合的功能性流畅 API。
当您在代码中使用 DatabaseClient
时,您只需要实现 java.util.function
接口,为它们提供明确定义的契约。给定由 DatabaseClient
类提供的 Connection
,Function
回调创建一个 Publisher
。提取 Row
结果的映射函数也是如此。
您可以通过直接实例化带有 ConnectionFactory
引用的 DatabaseClient
,或者在 Spring IoC 容器中配置它并将其作为 bean 引用提供给 DAO,来在 DAO 实现中使用 DatabaseClient
。
创建 DatabaseClient
对象的最简单方法是通过静态工厂方法,如下所示:
-
Java
-
Kotlin
DatabaseClient client = DatabaseClient.create(connectionFactory);
val client = DatabaseClient.create(connectionFactory)
应始终将 ConnectionFactory 配置为 Spring IoC 容器中的 bean。 |
DatabaseClient
。
DatabaseClient.builder()
获取一个
Builder
实例。您可以通过调用以下方法自定义客户端:
-
….bindMarkers(…)
:提供特定的BindMarkersFactory
来配置命名参数到数据库绑定标记的转换。 -
….executeFunction(…)
:设置ExecuteFunction
如何运行Statement
对象。 -
….namedParameters(false)
:禁用命名参数扩展。默认启用。
方言由 BindMarkersFactoryResolver 从 ConnectionFactory 解析,通常通过检查 ConnectionFactoryMetadata 。 您可以让 Spring 自动发现您的 BindMarkersFactory ,方法是注册一个实现 org.springframework.r2dbc.core.binding.BindMarkersFactoryResolver$BindMarkerFactoryProvider 的类,通过 META-INF/spring.factories 。 BindMarkersFactoryResolver 使用 Spring 的 SpringFactoriesLoader 从类路径中发现绑定标记提供程序实现。 |
-
H2
-
MariaDB
-
Microsoft SQL Server
-
MySQL
-
Postgres
DEBUG 级别记录。此外,每次执行都会在响应序列中注册一个检查点,以帮助调试。
DatabaseClient
使用示例。这些示例并非是
DatabaseClient
公开的所有功能的详尽列表。有关详细信息,请参阅附带的
javadoc。
执行语句
DatabaseClient
提供了运行语句的基本功能。以下示例显示了创建新表所需的最小但完全功能的代码:
-
Java
-
Kotlin
Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.then();
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.await()
DatabaseClient
设计用于方便、流畅的使用。它在执行规范的每个阶段暴露中间、继续和终端方法。上述示例使用 then()
返回一个完成的 Publisher
,一旦查询完成(或者如果 SQL 查询包含多个语句,则为多个语句),它就会完成。
execute(…) 接受 SQL 查询字符串或查询 Supplier<String> ,以推迟实际查询的创建直到执行。 |
查询 (SELECT
)
SQL查询可以通过Row
对象或受影响行数返回值。根据发出的查询,DatabaseClient
可以返回更新的行数或行本身。
以下查询从表中获取id
和name
列:
-
Java
-
Kotlin
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
.fetch().first();
val first = client.sql("SELECT id, name FROM person")
.fetch().awaitSingle()
以下查询使用绑定变量:
-
Java
-
Kotlin
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().first();
val first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitSingle()
fetch()
。
fetch()
是一个继续运算符,允许您指定要消耗多少数据。
调用first()
返回结果中的第一行并丢弃其余行。您可以使用以下运算符消耗数据:
-
first()
返回整个结果的第一行。其Kotlin Coroutine变体对于非空返回值命名为awaitSingle()
,如果值是可选的,则为awaitSingleOrNull()
。 -
one()
返回确切的一个结果,如果结果包含更多行则失败。使用Kotlin Coroutines,对于确切的一个值使用awaitOne()
,如果值可能为null
则使用awaitOneOrNull()
。 -
all()
返回结果的所有行。在使用Kotlin Coroutines时,请使用flow()
。 -
rowsUpdated()
返回受影响的行数(INSERT
/UPDATE
/DELETE
计数)。其Kotlin Coroutine变体命名为awaitRowsUpdated()
。
如果没有指定进一步的映射细节,查询将以Map
的形式返回表格结果,其键是不区分大小写的列名,映射到其列值。
您可以通过提供一个Function<Row, T>
来控制结果映射,该函数将为每个Row
调用,以便返回任意值(单个值、集合和映射以及对象)。
以下示例提取name
列并发出其值:
-
Java
-
Kotlin
Flux<String> names = client.sql("SELECT name FROM person")
.map(row -> row.get("name", String.class))
.all();
val names = client.sql("SELECT name FROM person")
.map{ row: Row -> row.get("name", String.class) }
.flow()
或者,有一个映射到单个值的快捷方式:
Flux<String> names = client.sql("SELECT name FROM person")
.mapValue(String.class)
.all();
或者您可以将其映射到具有bean属性或记录组件的结果对象:
// 假设Person上有一个name属性
Flux<Person> persons = client.sql("SELECT name FROM person")
.mapProperties(Person.class)
.all();
更新 (INSERT
、UPDATE
和DELETE
) 使用 DatabaseClient
rowsUpdated()
来消耗结果。
以下示例显示了一个UPDATE
语句,返回更新的行数:
-
Java
-
Kotlin
Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().rowsUpdated();
val affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitRowsUpdated()
绑定查询值
典型的应用程序需要使用参数化的SQL语句根据一些输入选择或更新行。这些通常是由WHERE
子句约束的SELECT
语句或接受输入参数的INSERT
和UPDATE
语句。如果参数没有正确转义,参数化语句会带来SQL注入的风险。DatabaseClient
利用了R2DBC的bind
API来消除查询参数的SQL注入风险。您可以使用execute(…)
操作符提供一个带参数的SQL语句,并将参数绑定到实际的Statement
上。然后您的R2DBC驱动程序通过使用准备好的语句和参数替换来运行该语句。
参数绑定支持两种绑定策略:
-
按索引绑定,使用从零开始的参数索引。
-
按名称绑定,使用占位符名称。
以下示例展示了查询的参数绑定:
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe")
.bind("name", "Joe")
.bind("age", 34);
或者,您可以传入一个名称和值的映射:
Map<String, Object> params = new LinkedHashMap<>();
params.put("id", "joe");
params.put("name", "Joe");
params.put("age", 34);
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindValues(params);
或者,您可以传入一个具有bean属性或记录组件的参数对象:
// 假设Person上有id、name、age属性
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindProperties(new Person("joe", "Joe", 34);
查询预处理器将命名Collection
参数展开为一系列绑定标记,以消除基于参数数量的动态查询创建的需求。嵌套对象数组被展开以允许使用(例如)选择列表。
考虑以下查询:
SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))
上述查询可以参数化并如下运行:
-
Java
-
Kotlin
List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann", 50});
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples);
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples)
选择列表的使用取决于供应商。 |
以下示例展示了使用IN
谓词的简化变体:
-
Java
-
Kotlin
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("ages", Arrays.asList(35, 50));
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("ages", arrayOf(35, 50))
R2DBC本身不支持类似集合的值。然而,在Spring的R2DBC支持中,将给定的List 展开可以用于命名参数,例如用于IN 子句中。然而,插入或更新数组类型的列(例如在Postgres中)需要一个由底层R2DBC驱动程序支持的数组类型:通常是一个Java数组,例如String[] 用于更新text[] 列。不要将Collection<String> 等作为数组参数传递。 |
语句过滤器
有时您需要在实际运行Statement
之前微调选项。为此,可以向DatabaseClient
注册一个Statement
过滤器(StatementFilterFunction
),以拦截和修改语句在执行过程中的行为,如下例所示:
-
Java
-
Kotlin
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
.bind("name", …)
.bind("state", …);
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { s: Statement, next: ExecuteFunction -> next.execute(s.returnGeneratedValues("id")) }
.bind("name", …)
.bind("state", …)
DatabaseClient
还提供了一个简化的filter(…)
重载,接受一个Function<Statement, Statement>
:
-
Java
-
Kotlin
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"));
client.sql("SELECT id, name, state FROM table")
.filter(statement -> s.fetchSize(25));
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { statement -> s.returnGeneratedValues("id") }
client.sql("SELECT id, name, state FROM table")
.filter { statement -> s.fetchSize(25) }
StatementFilterFunction
实现允许过滤Statement
和Result
对象。
DatabaseClient
最佳实践
DatabaseClient
类的实例在配置后是线程安全的。这一点很重要,因为这意味着您可以配置一个DatabaseClient
的实例,然后安全地将这个共享引用注入到多个DAO(或存储库)中。DatabaseClient
是有状态的,因为它保持对ConnectionFactory
的引用,但这种状态不是会话状态。
在使用DatabaseClient
类时的常见做法是在Spring配置文件中配置一个ConnectionFactory
,然后将这个共享的ConnectionFactory
bean依赖注入到DAO类中。在ConnectionFactory
的setter方法中创建DatabaseClient
。这会导致DAO类如下所示:
-
Java
-
Kotlin
public class R2dbcCorporateEventDao implements CorporateEventDao {
private DatabaseClient databaseClient;
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory);
}
// 基于R2DBC的CorporateEventDao方法的实现...
}
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao {
private val databaseClient = DatabaseClient.create(connectionFactory)
// 基于R2DBC的CorporateEventDao方法的实现...
}
除了显式配置之外,另一种方法是使用组件扫描和注解支持进行依赖注入。在这种情况下,您可以使用@Component
对类进行注解(使其成为组件扫描的候选对象),并使用@Autowired
对ConnectionFactory
的setter方法进行注解。以下示例展示了如何实现:
-
Java
-
Kotlin
@Component (1)
public class R2dbcCorporateEventDao implements CorporateEventDao {
private DatabaseClient databaseClient;
@Autowired (2)
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory); (3)
}
// 基于R2DBC的CorporateEventDao方法的实现...
}
1 | 使用@Component 对类进行注解。 |
2 | 使用@Autowired 对ConnectionFactory 的setter方法进行注解。 |
3 | 使用ConnectionFactory 创建一个新的DatabaseClient 。 |
@Component (1)
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao { (2)
private val databaseClient = DatabaseClient(connectionFactory) (3)
// 基于R2DBC的CorporateEventDao方法的实现...
}
1 | 使用@Component 对类进行注解。 |
2 | 使用构造函数注入ConnectionFactory 。 |
3 | 使用ConnectionFactory 创建一个新的DatabaseClient 。 |
无论您选择使用上述哪种模板初始化样式(或不使用),每次运行SQL时几乎不需要创建DatabaseClient
类的新实例。一旦配置好,DatabaseClient
实例是线程安全的。如果您的应用程序访问多个数据库,您可能需要多个DatabaseClient
实例,这就需要多个ConnectionFactory
,随后需要多个不同配置的DatabaseClient
实例。
检索自动生成的键
INSERT
语句在向定义了自增列或标识列的表中插入行时可能会生成键。要对生成的列名具有完全控制权,只需注册一个StatementFilterFunction
,该函数请求所需列的生成键。
-
Java
-
Kotlin
Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"))
.map(row -> row.get("id", Integer.class))
.first();
// 一旦INSERT语句执行完成,generatedId将发出生成的键
val generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { statement -> s.returnGeneratedValues("id") }
.map { row -> row.get("id", Integer.class) }
.awaitOne()
// 一旦INSERT语句执行完成,generatedId将发出生成的键
控制数据库连接
本节涵盖:
使用ConnectionFactory
Spring通过ConnectionFactory
从数据库获取R2DBC连接。 ConnectionFactory
是R2DBC规范的一部分,是驱动程序的常见入口点。它允许容器或框架隐藏应用程序代码中的连接池和事务管理问题。作为开发人员,您不需要了解如何连接到数据库的详细信息。这是设置ConnectionFactory
的管理员的责任。在开发和测试代码时,您很可能会充当这两种角色,但不一定需要了解生产数据源的配置方式。
当您使用Spring的R2DBC层时,可以使用第三方提供的连接池实现来配置自己的连接池。一个常用的实现是R2DBC Pool(r2dbc-pool
)。Spring分发中的实现仅用于测试目的,不提供连接池。
要配置ConnectionFactory
:
-
像通常获取R2DBC
ConnectionFactory
一样获取连接。 -
提供一个R2DBC URL(请参阅您的驱动程序文档以获取正确的值)。
以下示例显示了如何配置ConnectionFactory
:
-
Java
-
Kotlin
ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
val factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
使用ConnectionFactoryUtils
ConnectionFactoryUtils
类是一个方便且强大的助手类,提供了从ConnectionFactory
获取连接和关闭连接(如果需要)的static
方法。
它支持订阅者Context
绑定的连接,例如R2dbcTransactionManager
。
使用SingleConnectionFactory
SingleConnectionFactory
类是DelegatingConnectionFactory
接口的实现,它包装一个不会在每次使用后关闭的单个Connection
。
如果任何客户端代码调用close
,假设是一个连接池连接(例如在使用持久性工具时),您应将suppressClose
属性设置为true
。此设置返回一个包装物理连接的关闭抑制代理。请注意,您不能再将其转换为本机Connection
或类似对象。
SingleConnectionFactory
主要是一个测试类,可用于特定要求,例如如果您的R2DBC驱动程序允许进行流水线处理。与连接池ConnectionFactory
相比,它始终重用相同的连接,避免过度创建物理连接。
使用TransactionAwareConnectionFactoryProxy
TransactionAwareConnectionFactoryProxy
是目标ConnectionFactory
的代理。该代理包装目标ConnectionFactory
,以增加对Spring管理事务的意识。
如果您使用未与Spring的R2DBC支持集成的R2DBC客户端,则需要使用此类。在这种情况下,您仍然可以使用此客户端,并且同时使该客户端参与Spring管理的事务。通常最好将R2DBC客户端与具有适当访问ConnectionFactoryUtils 以进行资源管理的集成。 |
有关更多详细信息,请参阅TransactionAwareConnectionFactoryProxy
javadoc。
使用R2dbcTransactionManager
R2dbcTransactionManager
类是单个R2DBC ConnectionFactory
的ReactiveTransactionManager
实现。它将来自指定ConnectionFactory
的R2DBC Connection
绑定到订阅者Context
,可能允许每个ConnectionFactory
一个订阅者Connection
。
应用程序代码需要通过ConnectionFactoryUtils.getConnection(ConnectionFactory)
检索R2DBC Connection
,而不是使用R2DBC的标准ConnectionFactory.create()
。所有框架类(例如DatabaseClient
)都隐式使用此策略。如果不使用事务管理器,则查找策略的行为与ConnectionFactory.create()
完全相同,因此可以在任何情况下使用。