【】*敏*感*词*文件中改动的内容及项目说明
优采云 发布时间: 2021-05-03 20:12【】*敏*感*词*文件中改动的内容及项目说明
项目说明
1、当用户在浏览器中单击产品以对产品进行评分时,将调用产品服务的界面。
2、评分界面通过记录器将用户,产品,评分和其他信息输出到文件中。
3、 Flume监视日志文件,并通过日志主题将日志信息发送到Kafka。
4、清洁服务接收从日志主题发送的消息,并通过关键字过滤掉有效信息,然后通过推荐主题将有效信息发送给Kafka。
5、推荐服务接收推荐者主题的消息,并在一系列处理(例如实时算法处理)之后将其推送给用户。
工具安装JDK安装
JDK下载链接
下载jdk-8u281-linux-x6 4. tar.gz压缩包,并将其解压缩到hadoop用户主目录的jvm文件夹中
cd ~
mkdir jvm
tar -zxf jdk-8u281-linux-x64.tar.gz -C jvm
编辑环境变量:
vim ~/.bashrc
添加JAVA_HOME:
export JAVA_HOME=/home/hadoop/jvm/jdk1.8.0_281
export PATH=$JAVA_HOME/bin
使环境变量生效:
source ~/.bashrc
检查Java版本:
java -version
检查环境变量是否正确:
# 检验变量值
echo $JAVA_HOME
java -version
# 与直接执行 java -version 一样
$JAVA_HOME/bin/java -version
Zookeeper安装
Zookeeper下载地址:或。下载apache-zookeeper- 3. 6. 3-bin.tar.gz。
解压缩文件:
tar -zxf apache-zookeeper-3.6.3-bin.tar.gz -C ./
重命名:
mv apache-zookeeper-3.6.3-bin zookeeper
输入zookeeper文件以创建一个文件夹:
cd zookeeper
mkdir tmp
复制模板配置文件并修改:
cp ./conf/zoo-sample.cfg ./conf/zoo.cfg
vim ./conf/zoo.cfg
将dataDir的路径更改为刚创建的tmp目录的路径:
启动Zookeeper:
./bin/zkServer.sh start
正在启动Zookeeper…STARTED,表示启动成功。
如果需要停止Zookeeper,可以使用stop命令将其停止:
./bin/zkServer.sh stop
Flume-ng安装
通过wget下载水槽安装软件包:
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
解压缩到主目录:
tar -zxf apache-flume-1.9.0-bin.tar.gz -C ~
重命名:
mv apache-flume-1.9.0-bin flume
在水槽的conf目录中创建log-kafka.properties,内容为:
agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink
# For each one of the sources, the type is defined
agent.sources.exectail.type = exec
# 下面这个路径是需要收集日志的绝对路径,改为自己的日志目录
agent.sources.exectail.command = tail -f /home/hadoop/flume/log/agent.log
agent.sources.exectail.interceptors=i1
agent.sources.exectail.interceptors.i1.type=regex_filter
# 定义日志过滤前缀的正则
agent.sources.exectail.interceptors.i1.regex=.+PRODUCT_RATING_PREFIX.+
# The channel can be defined as follows.
agent.sources.exectail.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.kafka.topic = log
agent.sinks.kafkasink.kafka.bootstrap.servers = localhost:9092
agent.sinks.kafkasink.kafka.producer.acks = 1
agent.sinks.kafkasink.kafka.flumeBatchSize = 20
#Specify the channel the sink should use
agent.sinks.kafkasink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 10000
以上配置文件功能说明:
使用tail -f /home/hadoop/flume/log/agent.log命令通过正则表达式监视文件中更改的内容。 + PRODUCT_RATING_PREFIX。+匹配的内容,并将匹配的结果发送到Kafka的日志主题中的本地主机:9092。
对于上述配置参数,通常需要了解源,通道和*敏*感*词*。对于这三个部分之间的关系,这位官员给了一张照片:
Flume分布式系统中的核心角色是代理,而flume 采集系统是通过连接每个代理而形成的。每个代理都等效于一个数据传递者(封装为Event对象),其中收录三个组件:
来源
采集组件用于与数据源交互以获取数据。
水槽
*敏*感*词*组件,用于将数据传输到下一级代理或将数据传输到最终存储系统。
频道
传输通道组件,用于将数据从源传输到*敏*感*词*。
进入水槽目录并执行启动命令:
cd ~/flume
./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console
Kafka安装
通过wget下载安装软件包:
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
解压缩到主目录:
tar -zxf kafka_2.12-2.8.0.tgz -C ~
重命名:
mv kafka_2.12-2.8.0.tgz kafka
进入kafka目录:
cd kafka
修改Kafka配置:
vim config/server.properties
listeners=PLAINTEXT://:9092
# 192.168.1.43为本机ip
advertised.listeners=PLAINTEXT://192.168.1.43:9092
zookeeper.connect=localhost:2181
启动kafka(启动Zookeeper后):
bin/kafka-server-start.sh -daemon ./config/server.properties
如果您需要关闭Kafka,请执行:
bin/kafka-server-stop.sh
创建主题主题:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic recommender
在控制台上发送一条消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic recommender
输入命令后,控制台将显示需要输入的信息,此时输入的信息将在按Enter键后发送到kafka。
ctrl + c退出。
控制台上的输出消耗消息:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic recommender
您可以打开一个在两个终端中发送消息,而另一个在两个终端中接收消息。
服务建设
Maven项目结构:
BigData
├── BusinessServer #商品服务
├── KafkaStreaming #清洗服务
└── StreamingRecommender #推荐服务
商品服务
BusinessServer(SpringBoot项目)
主要提供一个宁静的界面,该界面用于将关键信息打印到控制台,并将日志输出配置为在水槽配置中指定的日志文件。
评分界面:
package cn.javayuli.businessserver.web;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 评分controller
*
* @author 韩桂林
*/
@RestController
public class RatingController {
private static final Logger LOGGER = LoggerFactory.getLogger(RatingController.class);
private static final String PRODUCT_RATING_PREFIX = "PRODUCT_RATING_PREFIX";
/**
* 用户对商品进行评分
*
* @param user 用户
* @param product 商品
* @param score 分数
* @return
*/
@GetMapping("/rate")
public String doRate(@RequestParam String user, @RequestParam String product, @RequestParam Double score) {
LOGGER.info(PRODUCT_RATING_PREFIX + ":" + user +"|"+ product +"|"+ score +"|"+ System.currentTimeMillis()/1000);
return "SUCCESS";
}
}
在application.properties中配置启动端口和log4j文件输出路径:
server.port=7001
logging.file.name=/home/hadoop/flume/log/agent.log
将项目键入一个jar包中,将其上传到服务器,然后使用java -jar ****。jar运行该项目。
清洁服务
KafkaStreaming(非SpringBoot项目)
介绍与kafka-streams相关的软件包:
org.apache.kafka
kafka-streams
2.8.0
org.apache.kafka
kafka-clients
2.8.0
创建处理器:
package cn.javayuli.kafkastream.processor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
* 日志预处理
*
* @author hanguilin
*/
public class LogProcessor implements Processor {
private ProcessorContext context;
private static final String PRODUCT_RATING_PREFIX = "PRODUCT_RATING_PREFIX:";
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(byte[] key, byte[] value) {
String input = new String(value);
// 根据前缀过滤日志信息,提取后面的内容
if(input.contains(PRODUCT_RATING_PREFIX)){
System.out.println("product rating coming!!!!" + input);
input = input.split(PRODUCT_RATING_PREFIX)[1].trim();
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
@Override
public void close() {
}
}
创建主要功能:
package cn.javayuli.kafkastream;
import cn.javayuli.kafkastream.processor.LogProcessor;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;
/**
* @author hanguilin
*/
public class KafkaStreamApp {
public static void main(String[] args) {
// kafka地址
String brokers = "192.168.1.43:9092";
// 定义输入和输出的topic
String from = "log";
String to = "recommender";
// 定义kafka streaming的配置
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
// 拓扑建构器
StreamsBuilder builder = new StreamsBuilder();
Topology build = builder.build();
// 定义流处理的拓扑结构
build.addSource("SOURCE", from)
.addProcessor("PROCESS", () -> new LogProcessor(), "SOURCE")
.addSink("SINK", to, "PROCESS");
KafkaStreams streams = new KafkaStreams(build, settings);
streams.start();
}
}
将项目键入jar包中,将其上传到服务器,然后使用java -cp ****。jar cn.javayuli.kafkastream.KafkaStreamApp运行项目。
推荐服务
StreamingRecommender(非SpringBoot项目)
此处仅使用邮件,不进行推荐计算。
主要功能:
package cn.javayuli.streamrecommender;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* @author hanguilin
*/
public class ConsumerApp {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.1.43:9092");
properties.put("group.id", "group-1");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
kafkaConsumer.subscribe(Arrays.asList("recommender"));
while (true) {
ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}
}
}
将项目键入一个jar包中,将其上传到服务器,然后使用java -cp ****。jar cn.javayuli.streamrecommender.ConsumerApp运行该项目。
如果需要在非服务器上远程测试程序,则需要打开服务器的7001(BusinessServer)和9092(Kafka)端口。有关端口命令,请参阅文章“ CentOS7端口命令”。
数据模拟
发送评分请求:
首先,产品服务将打印出日志:
查看/home/hadoop/flume/log/agent.log
如您所见,商品服务将日志附加到/home/hadoop/flume/log/agent.log文件。
这时,Flume检测到文件内容已更改,并将其他内容发送到Kafka日志主题。
这时,清洁服务从日志主题中获取收录PRODUCT_RATING_PREFIX的日志信息,并将处理后的信息发送到推荐主题。
(下图显示从日志中取出的数据,而不是已处理的数据)
由于推荐服务订阅了推荐者主题,因此使用了该消息。
资源地址
只有关键代码发布在文章中,请检查git信息库“推荐”中的所有代码。