细节内容:处理文件上传需要注意的细节
优采云 发布时间: 2022-11-21 01:19细节内容:处理文件上传需要注意的细节
1.上传文件中文乱码
1.1 解决文件乱码
ServletFileUpload.setHeaderEncoding("UTF-8");
1.2 解决普通输入项乱码(注意当表单类型为multipart/form-data时,设置请求的编码无效)
FileItem.setString("UTF-8"); //解决乱码
2.在处理表格之前,记得调用:
ServletFileUpload.isMultipartContent 方法判断提交表单的类型。如果该方法返回true,则作为上传方法处理;否则,表格可以用传统方式处理。
3.设置parser buffer的大小,以及临时文件的删除
设置解析器缓冲区的大小:DiskFileItemFactory.setSizeThreshold(1024*1024);
删除临时文件:在程序中处理完上传的文件后,一定要记得调用item.delete()方法删除临时文件
4、做上传系统的时候一定要注意上传文件的存放目录。上传文件的存放目录不能被外界直接访问。
5.限制上传文件的类型
处理上传文件时,判断上传文件的后缀是否允许
6.限制上传文件的大小
调用解析器的 ServletFileUpload.setFileSizeMax(102410245); 限制上传文件的大小。如果上传的文件超过限制,解析器将抛出 FileUploadBase.FileSizeLimitExceededException 异常。程序员可以通过检查异常是否被捕获来使用户友好。暗示。
7.如何判断上传输入项为空
" />
字符串文件名 = 项目。获取名称()。substring(item.getName().lastIndexOf("\")+1); “”
if(filename==null || filename.trim().equals("")){
继续;
}
8.为避免覆盖上传的文件,程序在保存上传文件时应为每个文件生成一个唯一的文件名
公共字符串生成文件名(字符串文件名){
//83434-83u483-934934
返回 UUID.randomUUID().toString() + "_" + 文件名;
}
9、为避免一个文件夹下保存超过1000个文件,影响文件访问性能,程序应将上传的文件拆分存储。
public String generateSavePath(字符串路径,字符串文件名){
int hashcode = filename.hashCode(); //121221
int dir1 = 哈希码&15;
int dir2 = (哈希码>>4)&0xf;
String savepath = path + File.separator + dir1 + File.separator + dir2;
File file = new File(savepath);
<p>
" />
if(!file.exists()){
file.mkdirs();
}
return savepath;
}
</p>
10.监控上传进度
ServletFileUpload upload = new ServletFileUpload(工厂);
upload.setProgressListener(new ProgressListener(){
public void update(long pBytesRead, long pContentLength, int pItems) {
System.out.println("当前解析:" + pBytesRead);
}
});
11、网页添加动态上传输入项
DiskFileItemFactory 是一个用于创建 FileItem 对象的工厂。该工厂类的常用方法:ServletFileUpload负责处理上传的文件数据,将表单中的每一个输入项封装到一个FileItem对象中。常用方法有:
源代码:
解决方案:使用Debezium、Postgres和Kafka进行数据实时采集(CDC)
一、背景
我一直在完善自己的微服务架构,其中包括分布式工作流服务的构建,目前使用的是Camunda工作流引擎。使用Camunda工作流会涉及到如何将工作流引擎的用户系统与现有的用户系统集成的问题(Flowable和Activity类似)。在现有设计中,工作流定位着重于企业内部流程的流转,因此系统在设计上与Camunda工作流用户系统对应单位、部门、人员、人员归属。
功能设计完成后,又面临一个问题,如何解决现有人事系统数据[实时]同步到Camunda工作流引擎的问题。如果现有的系统数据和工作流数据在同一个库中,相对容易解决。在微服务架构中,不同服务的数据通常存储在不同的数据库中,因此需要数据同步。使用的方法不同,所能达到的效果也是一样的。
最初,考虑了以下两个选项,但都略有不足:
经过大量资料的查询和比对,最终选择了德贝子木来解决以上问题以及以后更多的数据同步问题。
2. Debezium简介
RedHat 开源的 Debezium 是一个开源工具,可以从多个数据源中捕获实时变化的数据,并形成数据流输出。
它是一个 CDC(变更数据捕获)工具。其工作原理类似于大家熟知的Canal、DataBus、Maxwell等,通过提取数据库日志获取变化。
官方介绍是:
Debezium 是一个用于变更数据捕获的开源分布式平台。启动它,将其指向您的数据库,您的应用程序就可以开始响应其他应用程序提交给您的数据库的所有插入、更新和删除操作。Debezium 耐用且快速,因此即使出现问题,您的应用程序也可以快速响应并且不会错过任何事件
Debezium 是一个分布式平台,可以将您现有的数据库变成事件流,因此应用程序可以看到数据库中的每个行级更改并立即做出响应。Debezium 建立在 Apache Kafka 之上,并提供 Kafka connect 兼容的连接器来监控特定的数据库管理系统。
Debezium 现在支持以下数据库:
与ETL不同的是,德贝子木只支持生产端连接数据库,消费端不支持连接数据库,需要自己编写代码接收Kafka消息数据。经过分析,这种方式更加灵活,在现有的微服务架构中也能很好的利用Kafka。
3.快速搭建德贝子木测试环境。
目前,Debezium 的最新稳定版本是 1.6。Debezium已经将要使用的组件打包成Docker镜像,所以我们只需要按照以下步骤安装并启动Docker即可快速搭建测试环境。
Windows下如何搭建Docker环境,可以参考我的相关文章:
(1) Windows 10 2004 (20H1) 安装Docker Desktop for Windows (2.3.0.2) 以WSL 2模式运行容器
(2)对于Windows 10,将Docker Desktop for Windows(WSL 2模式)的文件存放移出C盘,放在其他目录下
3.1 运行动物园管理员
docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.6
3.2 运行卡夫卡
docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.6
3.3 运行 PostgreSQL
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.6
以上代码使用的是:debezium/example-postgres:1.6,查看Debezimu官方文档,其他例子都是这个。其实德贝子木有Docker封装PostgreSQL 9~13,大家可以根据自己的需要在Docker Hub中选择对应的PostgreSQL版本。
debezium/postgres 体积小,使用方便,并且已经做了必要的设置,可以直接使用,不需要额外配置。
3.4 运行Debezimu Connect
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres debezium/connect:1.6
Debezium容器启动时需要传入如下环境变量:
3.5 创建连接器
经过以上4个步骤,Debezium的测试环境搭建完成,接下来需要调用Debezium提供的API创建连接器,这样Debezium与数据库的关系就建立起来了。我们将以下有效负载发布到:8083/connectors/。
{
"name": "fulfillment-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "192.168.99.100",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "fulfillment",
"table.include.list": "public.inventory"
}
}
"name": 注册到Kafka Connect服务的Connector名称 "connector.class": PostgreSQL连接器类名称 "database.hostname": PostgreSQL数据库地址 "database.port": 端口PostgreSQL 数据库 "database.user": PostgreSQL 数据库的用户名 "database.password": PostgreSQL 数据密码 "database.dbname": 连接的PostgreSQL 数据库 "database.server.name": 虚拟数据库服务器的名称,其中可根据实际需要定义,消费Kafka数据时应使用该值 "table.include.list":监控的数据表列表,以","分隔。PostgreSQL 需要以“.”格式写入完整的表名。如果没有具体的Schema,
以下是完成的 curl 命令:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name": "fulfillment-connector", "config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "192.168.99.100", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres", "database.server.name": "fulfillment", "table.include.list": "public.inventory" }}'
上面是一个例子,因为我用的是Windows,个人觉得curl不方便,所以改用postman:
3.6 Docker Compose 配置
为了方便使用,将上述Docker命令集成到Docker Compose配置中,如下:
version: "3"
services:
postgres:
image: debezium/postgres:13
container_name: postgres
hostname: postgres
environment:
POSTGRES_USER: herodotus
POSTGRES_PASSWORD: herodotus
ports:
- 5432:5432
zookeeper:
image: debezium/zookeeper:1.6
container_name: zookeeper
restart: always
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: debezium/kafka:1.6
container_name: kafka
restart: always
ports:
- 9092:9092
environment:
ZOOKEEPER_CONNECT: zookeeper:2181
BOOTSTRAP_SERVERS: kafka:9092
depends_on:
- zookeeper
connect:
image: debezium/connect:1.6
container_name: connect
restart: always
ports:
- 8083:8083
environment:
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: herodotus_connect_configs
OFFSET_STORAGE_TOPIC: herodotus_connect_offsets
STATUS_STORAGE_TOPIC: herodotus_connect_statuses
BOOTSTRAP_SERVERS: kafka:9092
depends_on:
- kafka
4.外部数据库配置
在上一章中,我们介绍了德贝子木测试环境的方式。其中使用的debezium/postgres已经配置好,使用起来比较方便。在实际使用中,PostgreSQL往往是独立构建的,因此需要对PostgreSQL进行配置。
4.1 以Docker方式运行基本组件
本章主要介绍德贝子木与独立PostgreSQL数据库的连接。因此,除了PostgreSQL,Zookeeper、Kafka、Debezimu Connect仍然是使用Docker部署的。具体部署的Docker Compose配置如下:
version: "3"
services:
zookeeper:
image: debezium/zookeeper:1.6
container_name: zookeeper
hostname: zookeeper
environment:
ZOOKEEPER_SERVER_ID: 1
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: debezium/kafka:1.6
container_name: kafka
hostname: kafka
ports:
- 9092:9092
environment:
BROKER_ID: 1
ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: LISTENER_INNER://kafka:29092,LISTENER_OUTER://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_INNER://kafka:29092,LISTENER_OUTER://192.168.101.10:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_INNER:PLAINTEXT,LISTENER_OUTER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_INNER
KAFKA_ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
<p>
" />
connect:
image: debezium/connect:1.6
container_name: connect
hostname: connect
ports:
- 8083:8083
environment:
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: herodotus_connect_configs
OFFSET_STORAGE_TOPIC: herodotus_connect_offsets
STATUS_STORAGE_TOPIC: herodotus_connect_statuses
BOOTSTRAP_SERVERS: kafka:9092
depends_on:
- kafka
</p>
其中Kafka Listener相关的配置是为了解决Spring Kafka在连接Kafka时会出现:Connection to node -1 could not be established的问题。经纪人可能不可用。
4.2 修改PostgreSQL配置
PostgreSQL 在 9.4 中添加了逻辑解码功能,它是一种允许提取提交到事务日志的更改并借助输出插件以用户友好的方式处理这些更改的机制。输出插件使更改对客户端可用。
PostgreSQL连接器读取和处理数据库变化主要由两部分组成:
Java代码(即连接Kafka Connect的代码):负责读取Logical Decoding输出插件生成的数据。4.2.1 修改PostgreSQL配置
在${PostgreSQL_HOME}/13/data 目录下,找到postgresql.conf。
修改以下配置:
wal_level=logical
max_wal_senders=1
max_replication_slots=1
配置完成后记得重启数据库
4.2.2 设置数据库权限
需要将复制权限分配给 PostgreSQL 用户。定义一个 PostgreSQL 角色并分配至少两个权限:REPLICATION 和 LOGION。示例代码如下:
CREATE ROLE REPLICATION LOGIN;
具体操作请参考脚本:
-- pg新建用户
CREATE USER user WITH PASSWORD 'pwd';
-- 给用户复制流权限
ALTER ROLE user replication;
-- 给用户登录数据库权限
grant CONNECT ON DATABASE test to user;
-- 把当前库public下所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;
4.3 创建连接器
将以下有效负载发布到:8083/connectors/
{
"name": "herodotus-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "192.168.101.10",
"database.port": "15432",
"database.user": "athena",
"database.password": "athena",
"database.dbname" : "athena",
"database.server.name": "herodotus",
"slot.name": "herodotus_slot",
"table.include.list": "public.sys_organization",
"publication.name": "herodotus_public_connector",
"publication.autocreate.mode": "filtered",
"plugin.name": "pgoutput"
}
}
Postman界面操作如下:
payload有两个字段,name是connector的名称,config是connector的配置信息。下表解释了配置中的字段:
字段名称说明
连接器类
connector的实现类,本文使用io.debezium.connector.postgresql.PostgresConnector,因为我们的数据库是PostgreSQL
数据库主机名
数据库服务的IP或域名
数据库端口
数据库服务的端口
数据库.用户
连接到数据库的用户
数据库.密码
连接数据库的密码
数据库.dbname
数据存储名称
数据库.server.name
每个被监控的表都会对应Kafka中的一个topic,topic的命名约定为..
插槽名称
PostgreSQL 复制槽(Replication Slot)名称
表.include.list
如果设置了 table.include.list,则此列表中的表将由 Debezium 监控
插件名称
PostgreSQL服务器安装的解码插件名称,可以是decoderbufs、wal2json、wal2json_rds、wal2json_streaming、wal2json_rds_streaming、pgoutput。如果未指定此值,则默认使用 decoderbufs。
本例中使用pgoutput是因为它是PostgreSQL 10+内置的*敏*感*词*,而其他*敏*感*词*必须在PostgreSQL服务器上安装插件。
出版物名称
PostgreSQL端WAL发布(publication)的名字,每个Connector在PostgreSQL中应该有自己对应的发布,如果不指定这个参数,那么发布的名字就是dbz_publication
发布.autocreate.mode
该值仅在 plugin.name 设置为 pgoutput 时有效。有以下三个值:
all_tables - debezium 将检查发布是否存在,如果发布不存在,连接器将使用脚本 CREATE PUBLICATION
FOR ALL TABLES创建一个发布,即发布者会*敏*感*词*所有表的变化。
disabled - 连接器不会检查发布是否存在。如果发布不存在,创建连接器时会报错。
filtered - 与 all_tables 不同,debezium 会根据连接器配置中的 table.include.list 生成脚本来创建发布:CREATE PUBLICATION
对于表。例如,在这个例子中,如果“table.include.list”的值为“public.sys_organization”,那么发布将只*敏*感*词*这个表的变化。
下面结合本例中connector的配置信息,对几个关键属性做进一步说明:
Slot.name 亮点
根据上面的例子,Debezium 会在 PostgreSQL 中创建一个名为 herodotus_slot 的复制槽。本例中创建的connector需要通过这个replication slot获取数据变化的信息。
可以通过以下sql查看replication slot的信息:
select * from pg_replication_slots;
上图中active_pid为14200,即进程ID为14200的wal_sender进程已经在使用这个replication slot与Debezium进行交互
database.server.name 和 table.include.list
当connector获取到数据变化的信息后,将信息转换成统一的数据格式,发布到Kafka的topic中。Debezium规定一张表对应一个topic,topic名称的格式为..,本例中表的数据变化消息会保存在Kafka的topic herodotus.public.sys_organization中。
可以通过以下代码查看接收到的信息
@KafkaListener(topics = {"herodotus.public.sys_organization"}, groupId = "herodotus.debezium")
public void received(String message) {
log.info("[Herodotus] |- Recived message from Debezium : [{}]", message);
}
5. 运行测试
现在,基于以上环境的配置,大家可以测试一下Debezium抓包的效果了。可以进入Kafka容器,使用Kafka提供的kafka-console-consumer.sh查看Topic收到的数据。具体命令如下:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.101.10:9092 --topic herodotus.public.sys_organization
5.1 插入测试
在数据库sys_organization中插入一条数据
Kafka 的消费者命令行工具收到来自 Debezium 的数据更改消息:
格式化消息体如下,这里忽略schema字段,重点关注payload.before、payload.after和payload.op字段:
{
"schema": {
...
},
"payload": {
"before": null,
"after": {
"organization_id": "4",
"create_time": null,
"ranking": null,
"update_time": null,
"description": null,
"is_reserved": null,
"reversion": null,
"status": 1,
"a4_biz_org_id": null,
"biz_org_code": null,
"biz_org_desc": null,
<p>
" />
"biz_org_id": null,
"biz_org_name": null,
"biz_org_type": null,
"organization_name": "AAAAA",
"parent_id": null,
"partition_code": null,
"short_name": null
},
"source": {
"version": "1.6.0.Final",
"connector": "postgresql",
"name": "herodotus",
"ts_ms": 1626594964405,
"snapshot": "false",
"db": "athena",
"sequence": "[\"63461608\",\"63461608\"]",
"schema": "public",
"table": "sys_organization",
"txId": 2460,
"lsn": 63461896,
"xmin": null
},
"op": "c",
"ts_ms": 1626594964846,
"transaction": null
}
}
</p>
既然是插入操作,op就是c(create),before是null,after就是我们插入的数据。
5.2 更新测试
修改数据库sys_organization中的一条数据
Kafka 的消费者命令行工具收到来自 Debezium 的数据更改消息:
格式化后的消息体如下:
{
"schema": {
...
},
"payload": {
"before": null,
"after": {
"organization_id": "4",
"create_time": null,
"ranking": null,
"update_time": null,
"description": null,
"is_reserved": null,
"reversion": null,
"status": 1,
"a4_biz_org_id": null,
"biz_org_code": null,
"biz_org_desc": null,
"biz_org_id": null,
"biz_org_name": null,
"biz_org_type": null,
"organization_name": "BBBBB",
"parent_id": null,
"partition_code": null,
"short_name": null
},
"source": {
"version": "1.6.0.Final",
"connector": "postgresql",
"name": "herodotus",
"ts_ms": 1626595173601,
"snapshot": "false",
"db": "athena",
"sequence": "[\"63466888\",\"63466888\"]",
"schema": "public",
"table": "sys_organization",
"txId": 2461,
"lsn": 63467176,
"xmin": null
},
"op": "u",
"ts_ms": 1626595173825,
"transaction": null
}
}
更新商品信息后,消费者会收到op为u(update)的消息,after是修改后的数据。
5.3 删除测试
删除数据库sys_organization中的一条数据
Kafka 的消费者命令行工具收到来自 Debezium 的数据更改消息:
格式化后的消息体如下:
{
"schema": {
...
},
"payload": {
"before": {
"organization_id": "3",
"create_time": null,
"ranking": null,
"update_time": null,
"description": null,
"is_reserved": null,
"reversion": null,
"status": null,
"a4_biz_org_id": null,
"biz_org_code": null,
"biz_org_desc": null,
"biz_org_id": null,
"biz_org_name": null,
"biz_org_type": null,
"organization_name": null,
"parent_id": null,
"partition_code": null,
"short_name": null
},
"after": null,
"source": {
"version": "1.6.0.Final",
"connector": "postgresql",
"name": "herodotus",
"ts_ms": 1626594566933,
"snapshot": "false",
"db": "athena",
"sequence": "[\"63461120\",\"63461120\"]",
"schema": "public",
"table": "sys_organization",
"txId": 2458,
"lsn": 63461176,
"xmin": null
},
"op": "d",
"ts_ms": 1626594567136,
"transaction": null
}
}
删除商品信息后,消费者会收到op为d(delete)的消息,其中before为删除前的数据,after为null。
6.总结
通过德贝子木进行数据同步,不仅解决了传统ETL时效性低的问题,也解决了基于消息队列在两端编写代码的工程工作量,基于容器的方式更适合微服务架构的使用,使用Kafka进行消费终端集成,使得集成方式更加灵活方便,终端类型更加丰富。
示例代码地址:
参考
[1] :
[2] :
[3] :#postgresql-概述