解决方案:基于K8S部署filebeat及logstash并输出到java程序中

优采云 发布时间: 2022-11-27 23:46

  解决方案:基于K8S部署filebeat及logstash并输出到java程序中

  从 K8S 集群采集

容器日志并集中存储。

  溶液:

  1、守护进程集

  运行文件节拍

  作为守护进程,Filebeat 会在 JAVA 程序中采集

通过 logstash 发送的日志,然后由 JAVA 程序对其进行处理并集中存储。

  2、边车

  每个 POD 都增加了一个额外的 Filebeat 容器,Filebeat 读取相应的日志,并通过日志共享将其发送到 JAVA 程序。

  这两种方法可以共存而不会发生冲突。DaemonSet 方法采集容器的标准输出,如果有特殊要求,可以通过 sidecar 方法自定义采集日志。

  下面介绍了用于采集

容器日志的 daemonSet 方法的内容:

  首先粘贴 K8S 部署的 yaml 文件:

  # 创建账户

apiVersion: v1

kind: ServiceAccount

metadata:

labels:

k8s-app: itsm-node-manager

name: itsm-node-manager

namespace: kube-system

---

# 创建角色

apiVersion: rbac.authorization.k8s.io/v1

kind: ClusterRole

metadata:

labels:

k8s-app: itsm-node-manager

name: itsm-node-manager-role

namespace: kube-system

rules:

- apiGroups:

- ""

resources:

- nodes

- namespaces

- events

- pods

verbs:

- get

- list

- watch

---

# 账户与角色绑定

apiVersion: rbac.authorization.k8s.io/v1

kind: ClusterRoleBinding

metadata:

name: itsm-node-manager-role-binding

namespace: kube-system

roleRef:

apiGroup: rbac.authorization.k8s.io

kind: ClusterRole

name: itsm-node-manager-role

subjects:

- kind: ServiceAccount

name: itsm-node-manager

namespace: kube-system

---

# 创建logstash配置文件

apiVersion: v1

kind: ConfigMap

metadata:

labels:

k8s-app: itsm-node-manager

name: logstash-config

namespace: kube-system

data:

logstash.yml: 'config.reload.automatic: true'

pipeline.conf: |-

input {

beats {

port => 5044

codec => json

}

}

filter {

}

output {

http {

http_method => "post"

format => "json"

# 此处配置程序的url路径,java代码会在下面贴出来。如果调用的是集群内部的程序,可以采用和filebeat一样的域名方式

url => "http://192.168.0.195:8080/containerLog/insert"

content_type => "application/json"

}

}

---

# 创建logstash

apiVersion: apps/v1

kind: Deployment

metadata:

name: logstash

namespace: kube-system

labels:

server: logstash-7.10.1

spec:

selector:

matchLabels:

<p>

" />

k8s-app: logstash

template:

metadata:

creationTimestamp: null

labels:

k8s-app: logstash

name: logstash

spec:

containers:

- image: elastic/logstash:7.10.1

imagePullPolicy: IfNotPresent

name: logstash

securityContext:

procMount: Default

runAsUser: 0

volumeMounts:

- mountPath: /usr/share/logstash/config/logstash.yml

name: logstash-config

readOnly: true

subPath: logstash.yml

- mountPath: /usr/share/logstash/pipeline/logstash.conf

name: logstash-config

readOnly: true

subPath: pipeline.conf

dnsPolicy: ClusterFirst

restartPolicy: Always

schedulerName: default-scheduler

securityContext: {}

terminationGracePeriodSeconds: 120

imagePullSecrets:

- name: dockerpull

volumes:

- configMap:

defaultMode: 420

name: logstash-config

name: logstash-config

---

# 创建logstash service

apiVersion: v1

kind: Service

metadata:

labels:

k8s-app: logstash

name: logstash

namespace: kube-system

spec:

type: ClusterIP

selector:

k8s-app: logstash

ports:

- port: 5044

protocol: TCP

targetPort: 5044

---

# 创建filebeat配置文件

apiVersion: v1

kind: ConfigMap

metadata:

labels:

k8s-app: itsm-node-manager

name: filebeat-config

namespace: kube-system

data:

filebeat.yml: |-

filebeat.autodiscover:

providers:

- type: kubernetes

host: ${NODE_NAME}

hints.enabled: true

hints.default_config:

type: container

paths:

- /var/log/containers/*${data.kubernetes.container.id}.log

processors:

- add_cloud_metadata:

- add_host_metadata:

output.logstash:

hosts: ["logstash.kube-system.svc.cluster.local:5044"] # kubectl -n logs get svc

enabled: true

---

# 创建filebeat守护进程

apiVersion: apps/v1

kind: DaemonSet

metadata:

name: filebeat

namespace: kube-system

labels:

server: filebeat-7.10.1

spec:

selector:

matchLabels:

name: filebeat

kubernetes.io/cluster-service: "true"

template:

metadata:

creationTimestamp: null

  

" />

labels:

name: filebeat

kubernetes.io/cluster-service: "true"

spec:

containers:

- args:

- -c

- /etc/filebeat.yml

- -e

env:

- name: NODE_NAME

valueFrom:

fieldRef:

apiVersion: v1

fieldPath: spec.nodeName

image: elastic/filebeat:7.10.1

imagePullPolicy: IfNotPresent

name: filebeat

resources:

limits:

memory: 200Mi

requests:

cpu: 100m

memory: 100Mi

securityContext:

procMount: Default

runAsUser: 0

volumeMounts:

- mountPath: /etc/filebeat.yml

name: config

readOnly: true

subPath: filebeat.yml

- mountPath: /usr/share/filebeat/data

name: data

- mountPath: /var/lib/docker/containers

name: varlibdockercontainers

readOnly: true

- mountPath: /var/log

name: varlog

readOnly: true

restartPolicy: Always

serviceAccount: itsm-node-manager

serviceAccountName: itsm-node-manager

volumes:

- configMap:

defaultMode: 384

name: filebeat-config

name: config

- hostPath:

path: /var/lib/docker/containers

type: ""

name: varlibdockercontainers

- hostPath:

path: /var/log

type: ""

name: varlog

- hostPath:

path: /opt/filebeat/data

type: DirectoryOrCreate

name: data

</p>

  这是将多个部署信息放在一个 YAML 文件中,用“---”分隔。

  以下是 JAVA 代码片段:

  @Api(tags = "服务日志控制类")

@Slf4j

@RestController

@RequestMapping("/containerLog")

public class ContainerLogController {

@Autowired

private ContainerLogService containerLogService;

@ApiOperation(value = "容器日志写入接口",produces = "application/json", response = String.class)

@PostMapping("insert")

public Result insert(HttpServletRequest httpServletRequest){

BufferedReader br = null;

StringBuilder sb = new StringBuilder("");

try {

br = httpServletRequest.getReader();

String str;

while ((str=br.readLine())!=null){

sb.append(str);

}

containerLogService.insert(sb.toString());

} catch (IOException e) {

e.printStackTrace();

}

return Result.newSuccess();

}

}

  此时,您可以获取 logstash 发送的日志信息,并且容器日志均为 JSON 格式。

  您可以在三个位置扩展以满足您的需求:

  1. 文件节拍采集

规则

  2. 日志存储过滤规则

  3. 程序处理逻辑

  最佳实践:pytest文档83 - 把收集的 yaml 文件转 Item 用例并运行

  前言

  上一篇文章通过用例采集

挂钩pytest_collect_file采集

YAML 文件,但只采集

用例,无法执行。

  接下来,详细解释如何将 yaml 文件的内容转换为要执行的 Item 用例。

  pytest_collect_file 采集

钩子

  准备 YAML 文件内容 test_login.yml

  name: login case1<br />request:<br /> url: http://127.0.0.1:8000/api/v1/login/<br /> method: POST<br /> headers:<br /> Content-Type: application/json<br /> json:<br /> username: test<br /> password: 123456

  首先将集合钩子写 conftest.py

  def pytest_collect_file(file_path: Path, parent):<br /> # 获取文件.yml 文件,匹配规则<br /> if file_path.suffix == ".yml" and file_path.name.startswith("test"):<br /> return pytest.File.from_parent(parent, path=file_path)

  如果采集

到 yaml 文件中,则返回 pytest。File.from_parent(父级,路径=file_path),运行时将报告错误

  ============================================ ERRORS ============================================<br />_____________________________ ERROR collecting case/test_login.yml _____________________________<br />venv\lib\site-packages\_pytest\runner.py:339: in from_call<br /> result: Optional[TResult] = func()<br />venv\lib\site-packages\_pytest\runner.py:370: in <br /> call = CallInfo.from_call(lambda: list(collector.collect()), "collect")<br />venv\lib\site-packages\_pytest\nodes.py:536: in collect<br /> raise NotImplementedError("abstract")<br />E NotImplementedError: abstract<br />=================================== short test summary info ====================================<br />ERROR case/test_login.yml - NotImplementedError: abstract<br />!!!!!!!!!!!!!!!!!!!!!!!!!!!! Interrupted: 1 error during collection !!!!!!!!!!!!!!!!!!!!!!!!!!!!

  

" />

  该错误在 nodes.py 文件的 collect() 方法中报告,因此请在 nodes.py 中找到采集

  class Collector(Node):<br /> """Collector instances create children through collect() and thus<br /> iteratively build a tree."""<br /><br /> class CollectError(Exception):<br /> """An error during collection, contains a custom message."""<br /><br /> def collect(self) -> Iterable[Union["Item", "Collector"]]:<br /> """Return a list of children (items and collectors) for this<br /> collection node."""<br /> raise NotImplementedError("abstract")

  由于 collect() 方法

  为空,它直接引发异常 NotImplementError(“abstract”),因此我们需要覆盖 collect() 方法

  YamlFile 重写 collect()。

  对应于 YamlFile 类,继承 ytest。文件,它覆盖 collect() 方法

  class YamlFile(pytest.File):<br /><br /> def collect(self):<br /> """返回读取内容的Iterable 可迭代对象"""<br /> raw = yaml.safe_load(self.fspath.open(encoding='utf-8'))<br /> print(raw)<br /> # raw 是读取 yml 数据的内容<br /> yield pytest.Item.from_parent(self, name=raw.get('name'), values=raw)

  再次运行 pytest

  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _<br /><br />item = <br /><br /> def pytest_runtest_call(item: Item) -> None:<br /> _update_current_test_var(item, "call")<br /> try:<br /> del sys.last_type<br /> del sys.last_value<br /> del sys.last_traceback<br /> except AttributeError:<br /> pass<br /> try:<br />> item.runtest()<br /><br />venv\lib\site-packages\_pytest\runner.py:167:<br />_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _<br /><br />self = <br /><br /> def runtest(self) -> None:<br /> """Run the test case for this item.<br /><br /> Must be implemented by subclasses.<br /><br /> .. seealso:: :ref:`non-python tests`<br /> """<br />> raise NotImplementedError("runtest must be implemented by Item subclass")<br />E NotImplementedError: runtest must be implemented by Item subclass<br /><br />venv\lib\site-packages\_pytest\nodes.py:733: NotImplementedError

  这次发生的错误在 runner.py 文件中报告,并且通过执行 runtest() 方法 NotImplementError(“runtest 必须由 Item 子类实现”)引发的异常

  

" />

  )。

  看到这里,就意味着用例 Item 已经生成,并且在执行时,没有定义执行 yaml 文件的方法,因此报告了一个错误

  所以我在 nodes.py 中找到了 Item(Node) 类

  class Item(Node):<br /> """A basic test invocation item.<br /><br /> Note that for a single function there might be multiple test invocation items.<br /> """<br /><br /> def runtest(self) -> None:<br /> """Run the test case for this item.<br /><br /> Must be implemented by subclasses.<br /><br /> .. seealso:: :ref:`non-python tests`<br /> """<br /> raise NotImplementedError("runtest must be implemented by Item subclass")

  接下来,您需要重写 Item 中的运行测试以执行用例

  重写项目的运行测试

  您最终看到执行yaml文件的简短版本的界面用例 conftest.py 如下

  import pytest<br />import requests<br />import yaml<br />from pathlib import Path<br /><br />def pytest_collect_file(file_path: Path, parent):<br /> # 获取文件.yml 文件,匹配规则<br /> if file_path.suffix == ".yml" and file_path.name.startswith("test"):<br /> return YamlFile.from_parent(parent, path=file_path)<br /><br />class YamlFile(pytest.File):<br /><br /> def collect(self):<br /> """返回读取内容的Iterable 可迭代对象"""<br /> raw = yaml.safe_load(self.fspath.open(encoding='utf-8'))<br /> print(raw)<br /> # raw 是读取 yml 数据的内容<br /> yield YamlTest.from_parent(self, name=raw.get('name'), values=raw)<br /><br />class YamlTest(pytest.Item):<br /> def __init__(self, name, parent, values):<br /> super(YamlTest, self).__init__(name, parent)<br /> self.name = name<br /> self.values = values<br /> self.s = requests.session()<br /><br /> def runtest(self) -> None:<br /> """运行用例"""<br /> request_data = self.values["request"]<br /> response = self.s.request(**request_data)<br /> print("\n", response.text)

  输入pytest,您可以看到yaml文件作为用例执行

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线