【】*敏*感*词*文件中改动的内容及项目说明

优采云 发布时间: 2021-05-03 20:12

  【】*敏*感*词*文件中改动的内容及项目说明

  项目说明

  

  1、当用户在浏览器中单击产品以对产品进行评分时,将调用产品服务的界面。

  2、评分界面通过记录器将用户,产品,评分和其他信息输出到文件中。

  3、 Flume监视日志文件,并通过日志主题将日志信息发送到Kafka。

  4、清洁服务接收从日志主题发送的消息,并通过关键字过滤掉有效信息,然后通过推荐主题将有效信息发送给Kafka。

  5、推荐服务接收推荐者主题的消息,并在一系列处理(例如实时算法处理)之后将其推送给用户。

  工具安装JDK安装

  JDK下载链接

  下载jdk-8u281-linux-x6 4. tar.gz压缩包,并将其解压缩到hadoop用户主目录的jvm文件夹中

  cd ~

mkdir jvm

tar -zxf jdk-8u281-linux-x64.tar.gz -C jvm

  编辑环境变量:

  vim ~/.bashrc

  添加JAVA_HOME:

  export JAVA_HOME=/home/hadoop/jvm/jdk1.8.0_281

export PATH=$JAVA_HOME/bin

  使环境变量生效:

  source ~/.bashrc

  检查Java版本:

  java -version

  检查环境变量是否正确:

  # 检验变量值

echo $JAVA_HOME

  java -version

# 与直接执行 java -version 一样

$JAVA_HOME/bin/java -version

  Zookeeper安装

  Zookeeper下载地址:或。下载apache-zookeeper- 3. 6. 3-bin.tar.gz。

  解压缩文件:

  tar -zxf apache-zookeeper-3.6.3-bin.tar.gz -C ./

  重命名:

  mv apache-zookeeper-3.6.3-bin zookeeper

  输入zookeeper文件以创建一个文件夹:

  cd zookeeper

mkdir tmp

  复制模板配置文件并修改:

  cp ./conf/zoo-sample.cfg ./conf/zoo.cfg

vim ./conf/zoo.cfg

  将dataDir的路径更改为刚创建的tmp目录的路径:

  

  启动Zookeeper:

  ./bin/zkServer.sh start

  

  正在启动Zookeeper…STARTED,表示启动成功。

  如果需要停止Zookeeper,可以使用stop命令将其停止:

  ./bin/zkServer.sh stop

  Flume-ng安装

  通过wget下载水槽安装软件包:

  wget https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

  解压缩到主目录:

  tar -zxf apache-flume-1.9.0-bin.tar.gz -C ~

  重命名:

  mv apache-flume-1.9.0-bin flume

  在水槽的conf目录中创建log-kafka.properties,内容为:

  agent.sources = exectail

agent.channels = memoryChannel

agent.sinks = kafkasink

# For each one of the sources, the type is defined

agent.sources.exectail.type = exec

# 下面这个路径是需要收集日志的绝对路径,改为自己的日志目录

agent.sources.exectail.command = tail -f /home/hadoop/flume/log/agent.log

agent.sources.exectail.interceptors=i1

agent.sources.exectail.interceptors.i1.type=regex_filter

# 定义日志过滤前缀的正则

agent.sources.exectail.interceptors.i1.regex=.+PRODUCT_RATING_PREFIX.+

# The channel can be defined as follows.

agent.sources.exectail.channels = memoryChannel

# Each sink's type must be defined

agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink

agent.sinks.kafkasink.kafka.topic = log

agent.sinks.kafkasink.kafka.bootstrap.servers = localhost:9092

agent.sinks.kafkasink.kafka.producer.acks = 1

agent.sinks.kafkasink.kafka.flumeBatchSize = 20

#Specify the channel the sink should use

agent.sinks.kafkasink.channel = memoryChannel

# Each channel's type is defined.

agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)

# can be defined as well

# In this case, it specifies the capacity of the memory channel

agent.channels.memoryChannel.capacity = 10000

  以上配置文件功能说明:

  使用tail -f /home/hadoop/flume/log/agent.log命令通过正则表达式监视文件中更改的内容。 + PRODUCT_RATING_PREFIX。+匹配的内容,并将匹配的结果发送到Kafka的日志主题中的本地主机:9092。

  对于上述配置参数,通常需要了解源,通道和*敏*感*词*。对于这三个部分之间的关​​系,这位官员给了一张照片:

  

  Flume分布式系统中的核心角色是代理,而flume 采集系统是通过连接每个代理而形成的。每个代理都等效于一个数据传递者(封装为Event对象),其中收录三个组件:

  来源

  采集组件用于与数据源交互以获取数据。

  水槽

  *敏*感*词*组件,用于将数据传输到下一级代理或将数据传输到最终存储系统。

  频道

  传输通道组件,用于将数据从源传输到*敏*感*词*。

  进入水槽目录并执行启动命令:

  cd ~/flume

./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console

  Kafka安装

  通过wget下载安装软件包:

  wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz

  解压缩到主目录:

  tar -zxf kafka_2.12-2.8.0.tgz -C ~

  重命名:

  mv kafka_2.12-2.8.0.tgz kafka

  进入kafka目录:

  cd kafka

  修改Kafka配置:

  vim config/server.properties

  listeners=PLAINTEXT://:9092

# 192.168.1.43为本机ip

advertised.listeners=PLAINTEXT://192.168.1.43:9092

zookeeper.connect=localhost:2181

  启动kafka(启动Zookeeper后):

  bin/kafka-server-start.sh -daemon ./config/server.properties

  如果您需要关闭Kafka,请执行:

  bin/kafka-server-stop.sh

  创建主题主题:

  bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic recommender

  在控制台上发送一条消息:

  bin/kafka-console-producer.sh --broker-list localhost:9092 --topic recommender

  输入命令后,控制台将显示需要输入的信息,此时输入的信息将在按Enter键后发送到kafka。

  

  ctrl + c退出。

  控制台上的输出消耗消息:

  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic recommender

  您可以打开一个在两个终端中发送消息,而另一个在两个终端中接收消息。

  服务建设

  Maven项目结构:

  BigData

├── BusinessServer #商品服务

├── KafkaStreaming #清洗服务

└── StreamingRecommender #推荐服务

  商品服务

  BusinessServer(SpringBoot项目)

  主要提供一个宁静的界面,该界面用于将关键信息打印到控制台,并将日志输出配置为在水槽配置中指定的日志文件。

  评分界面:

  package cn.javayuli.businessserver.web;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestParam;

import org.springframework.web.bind.annotation.RestController;

/**

* 评分controller

*

* @author 韩桂林

*/

@RestController

public class RatingController {

private static final Logger LOGGER = LoggerFactory.getLogger(RatingController.class);

private static final String PRODUCT_RATING_PREFIX = "PRODUCT_RATING_PREFIX";

/**

* 用户对商品进行评分

*

* @param user 用户

* @param product 商品

* @param score 分数

* @return

*/

@GetMapping("/rate")

public String doRate(@RequestParam String user, @RequestParam String product, @RequestParam Double score) {

LOGGER.info(PRODUCT_RATING_PREFIX + ":" + user +"|"+ product +"|"+ score +"|"+ System.currentTimeMillis()/1000);

return "SUCCESS";

}

}

  在application.properties中配置启动端口和log4j文件输出路径:

  server.port=7001

logging.file.name=/home/hadoop/flume/log/agent.log

  将项目键入一个jar包中,将其上传到服务器,然后使用java -jar ****。jar运行该项目。

  清洁服务

  KafkaStreaming(非SpringBoot项目)

  介绍与kafka-streams相关的软件包:

  

org.apache.kafka

kafka-streams

2.8.0

org.apache.kafka

kafka-clients

2.8.0

  创建处理器:

  package cn.javayuli.kafkastream.processor;

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorContext;

/**

* 日志预处理

*

* @author hanguilin

*/

public class LogProcessor implements Processor {

private ProcessorContext context;

private static final String PRODUCT_RATING_PREFIX = "PRODUCT_RATING_PREFIX:";

@Override

public void init(ProcessorContext context) {

this.context = context;

}

@Override

public void process(byte[] key, byte[] value) {

String input = new String(value);

// 根据前缀过滤日志信息,提取后面的内容

if(input.contains(PRODUCT_RATING_PREFIX)){

System.out.println("product rating coming!!!!" + input);

input = input.split(PRODUCT_RATING_PREFIX)[1].trim();

context.forward("logProcessor".getBytes(), input.getBytes());

}

}

@Override

public void close() {

}

}

  创建主要功能:

  package cn.javayuli.kafkastream;

import cn.javayuli.kafkastream.processor.LogProcessor;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsBuilder;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.Topology;

import java.util.Properties;

/**

* @author hanguilin

*/

public class KafkaStreamApp {

public static void main(String[] args) {

// kafka地址

String brokers = "192.168.1.43:9092";

// 定义输入和输出的topic

String from = "log";

String to = "recommender";

// 定义kafka streaming的配置

Properties settings = new Properties();

settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");

settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

// 拓扑建构器

StreamsBuilder builder = new StreamsBuilder();

Topology build = builder.build();

// 定义流处理的拓扑结构

build.addSource("SOURCE", from)

.addProcessor("PROCESS", () -> new LogProcessor(), "SOURCE")

.addSink("SINK", to, "PROCESS");

KafkaStreams streams = new KafkaStreams(build, settings);

streams.start();

}

}

  将项目键入jar包中,将其上传到服务器,然后使用java -cp ****。jar cn.javayuli.kafkastream.KafkaStreamApp运行项目。

  推荐服务

  StreamingRecommender(非SpringBoot项目)

  此处仅使用邮件,不进行推荐计算。

  主要功能:

  package cn.javayuli.streamrecommender;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;

import java.util.Arrays;

import java.util.Properties;

/**

* @author hanguilin

*/

public class ConsumerApp {

public static void main(String[] args){

Properties properties = new Properties();

properties.put("bootstrap.servers", "192.168.1.43:9092");

properties.put("group.id", "group-1");

properties.put("enable.auto.commit", "true");

properties.put("auto.commit.interval.ms", "1000");

properties.put("auto.offset.reset", "earliest");

properties.put("session.timeout.ms", "30000");

properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);

kafkaConsumer.subscribe(Arrays.asList("recommender"));

while (true) {

ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(100));

for (ConsumerRecord record : records) {

System.out.printf("offset = %d, value = %s", record.offset(), record.value());

System.out.println();

}

}

}

}

  将项目键入一个jar包中,将其上传到服务器,然后使用java -cp ****。jar cn.javayuli.streamrecommender.ConsumerApp运行该项目。

  如果需要在非服务器上远程测试程序,则需要打开服务器的7001(BusinessServer)和9092(Kafka)端口。有关端口命令,请参阅文章“ CentOS7端口命令”。

  数据模拟

  发送评分请求:

  

  首先,产品服务将打印出日志:

  

  查看/home/hadoop/flume/log/agent.log

  

  如您所见,商品服务将日志附加到/home/hadoop/flume/log/agent.log文件。

  这时,Flume检测到文件内容已更改,并将其他内容发送到Kafka日志主题。

  这时,清洁服务从日志主题中获取收录PRODUCT_RATING_PREFIX的日志信息,并将处理后的信息发送到推荐主题。

  (下图显示从日志中取出的数据,而不是已处理的数据)

  

  由于推荐服务订阅了推荐者主题,因此使用了该消息。

  

  资源地址

  只有关键代码发布在文章中,请检查git信息库“推荐”中的所有代码。

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线