解决方案:基于K8S部署filebeat及logstash并输出到java程序中
优采云 发布时间: 2022-11-27 23:46解决方案:基于K8S部署filebeat及logstash并输出到java程序中
从 K8S 集群采集
容器日志并集中存储。
溶液:
1、守护进程集
运行文件节拍
作为守护进程,Filebeat 会在 JAVA 程序中采集
通过 logstash 发送的日志,然后由 JAVA 程序对其进行处理并集中存储。
2、边车
每个 POD 都增加了一个额外的 Filebeat 容器,Filebeat 读取相应的日志,并通过日志共享将其发送到 JAVA 程序。
这两种方法可以共存而不会发生冲突。DaemonSet 方法采集容器的标准输出,如果有特殊要求,可以通过 sidecar 方法自定义采集日志。
下面介绍了用于采集
容器日志的 daemonSet 方法的内容:
首先粘贴 K8S 部署的 yaml 文件:
# 创建账户
apiVersion: v1
kind: ServiceAccount
metadata:
labels:
k8s-app: itsm-node-manager
name: itsm-node-manager
namespace: kube-system
---
# 创建角色
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
labels:
k8s-app: itsm-node-manager
name: itsm-node-manager-role
namespace: kube-system
rules:
- apiGroups:
- ""
resources:
- nodes
- namespaces
- events
- pods
verbs:
- get
- list
- watch
---
# 账户与角色绑定
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: itsm-node-manager-role-binding
namespace: kube-system
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: itsm-node-manager-role
subjects:
- kind: ServiceAccount
name: itsm-node-manager
namespace: kube-system
---
# 创建logstash配置文件
apiVersion: v1
kind: ConfigMap
metadata:
labels:
k8s-app: itsm-node-manager
name: logstash-config
namespace: kube-system
data:
logstash.yml: 'config.reload.automatic: true'
pipeline.conf: |-
input {
beats {
port => 5044
codec => json
}
}
filter {
}
output {
http {
http_method => "post"
format => "json"
# 此处配置程序的url路径,java代码会在下面贴出来。如果调用的是集群内部的程序,可以采用和filebeat一样的域名方式
url => "http://192.168.0.195:8080/containerLog/insert"
content_type => "application/json"
}
}
---
# 创建logstash
apiVersion: apps/v1
kind: Deployment
metadata:
name: logstash
namespace: kube-system
labels:
server: logstash-7.10.1
spec:
selector:
matchLabels:
<p>
" />
k8s-app: logstash
template:
metadata:
creationTimestamp: null
labels:
k8s-app: logstash
name: logstash
spec:
containers:
- image: elastic/logstash:7.10.1
imagePullPolicy: IfNotPresent
name: logstash
securityContext:
procMount: Default
runAsUser: 0
volumeMounts:
- mountPath: /usr/share/logstash/config/logstash.yml
name: logstash-config
readOnly: true
subPath: logstash.yml
- mountPath: /usr/share/logstash/pipeline/logstash.conf
name: logstash-config
readOnly: true
subPath: pipeline.conf
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
terminationGracePeriodSeconds: 120
imagePullSecrets:
- name: dockerpull
volumes:
- configMap:
defaultMode: 420
name: logstash-config
name: logstash-config
---
# 创建logstash service
apiVersion: v1
kind: Service
metadata:
labels:
k8s-app: logstash
name: logstash
namespace: kube-system
spec:
type: ClusterIP
selector:
k8s-app: logstash
ports:
- port: 5044
protocol: TCP
targetPort: 5044
---
# 创建filebeat配置文件
apiVersion: v1
kind: ConfigMap
metadata:
labels:
k8s-app: itsm-node-manager
name: filebeat-config
namespace: kube-system
data:
filebeat.yml: |-
filebeat.autodiscover:
providers:
- type: kubernetes
host: ${NODE_NAME}
hints.enabled: true
hints.default_config:
type: container
paths:
- /var/log/containers/*${data.kubernetes.container.id}.log
processors:
- add_cloud_metadata:
- add_host_metadata:
output.logstash:
hosts: ["logstash.kube-system.svc.cluster.local:5044"] # kubectl -n logs get svc
enabled: true
---
# 创建filebeat守护进程
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: filebeat
namespace: kube-system
labels:
server: filebeat-7.10.1
spec:
selector:
matchLabels:
name: filebeat
kubernetes.io/cluster-service: "true"
template:
metadata:
creationTimestamp: null
" />
labels:
name: filebeat
kubernetes.io/cluster-service: "true"
spec:
containers:
- args:
- -c
- /etc/filebeat.yml
- -e
env:
- name: NODE_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
image: elastic/filebeat:7.10.1
imagePullPolicy: IfNotPresent
name: filebeat
resources:
limits:
memory: 200Mi
requests:
cpu: 100m
memory: 100Mi
securityContext:
procMount: Default
runAsUser: 0
volumeMounts:
- mountPath: /etc/filebeat.yml
name: config
readOnly: true
subPath: filebeat.yml
- mountPath: /usr/share/filebeat/data
name: data
- mountPath: /var/lib/docker/containers
name: varlibdockercontainers
readOnly: true
- mountPath: /var/log
name: varlog
readOnly: true
restartPolicy: Always
serviceAccount: itsm-node-manager
serviceAccountName: itsm-node-manager
volumes:
- configMap:
defaultMode: 384
name: filebeat-config
name: config
- hostPath:
path: /var/lib/docker/containers
type: ""
name: varlibdockercontainers
- hostPath:
path: /var/log
type: ""
name: varlog
- hostPath:
path: /opt/filebeat/data
type: DirectoryOrCreate
name: data
</p>
这是将多个部署信息放在一个 YAML 文件中,用“---”分隔。
以下是 JAVA 代码片段:
@Api(tags = "服务日志控制类")
@Slf4j
@RestController
@RequestMapping("/containerLog")
public class ContainerLogController {
@Autowired
private ContainerLogService containerLogService;
@ApiOperation(value = "容器日志写入接口",produces = "application/json", response = String.class)
@PostMapping("insert")
public Result insert(HttpServletRequest httpServletRequest){
BufferedReader br = null;
StringBuilder sb = new StringBuilder("");
try {
br = httpServletRequest.getReader();
String str;
while ((str=br.readLine())!=null){
sb.append(str);
}
containerLogService.insert(sb.toString());
} catch (IOException e) {
e.printStackTrace();
}
return Result.newSuccess();
}
}
此时,您可以获取 logstash 发送的日志信息,并且容器日志均为 JSON 格式。
您可以在三个位置扩展以满足您的需求:
1. 文件节拍采集
规则
2. 日志存储过滤规则
3. 程序处理逻辑
最佳实践:pytest文档83 - 把收集的 yaml 文件转 Item 用例并运行
前言
上一篇文章通过用例采集
挂钩pytest_collect_file采集
YAML 文件,但只采集
用例,无法执行。
接下来,详细解释如何将 yaml 文件的内容转换为要执行的 Item 用例。
pytest_collect_file 采集
钩子
准备 YAML 文件内容 test_login.yml
name: login case1<br />request:<br /> url: http://127.0.0.1:8000/api/v1/login/<br /> method: POST<br /> headers:<br /> Content-Type: application/json<br /> json:<br /> username: test<br /> password: 123456
首先将集合钩子写 conftest.py
def pytest_collect_file(file_path: Path, parent):<br /> # 获取文件.yml 文件,匹配规则<br /> if file_path.suffix == ".yml" and file_path.name.startswith("test"):<br /> return pytest.File.from_parent(parent, path=file_path)
如果采集
到 yaml 文件中,则返回 pytest。File.from_parent(父级,路径=file_path),运行时将报告错误
============================================ ERRORS ============================================<br />_____________________________ ERROR collecting case/test_login.yml _____________________________<br />venv\lib\site-packages\_pytest\runner.py:339: in from_call<br /> result: Optional[TResult] = func()<br />venv\lib\site-packages\_pytest\runner.py:370: in <br /> call = CallInfo.from_call(lambda: list(collector.collect()), "collect")<br />venv\lib\site-packages\_pytest\nodes.py:536: in collect<br /> raise NotImplementedError("abstract")<br />E NotImplementedError: abstract<br />=================================== short test summary info ====================================<br />ERROR case/test_login.yml - NotImplementedError: abstract<br />!!!!!!!!!!!!!!!!!!!!!!!!!!!! Interrupted: 1 error during collection !!!!!!!!!!!!!!!!!!!!!!!!!!!!
" />
该错误在 nodes.py 文件的 collect() 方法中报告,因此请在 nodes.py 中找到采集
器
class Collector(Node):<br /> """Collector instances create children through collect() and thus<br /> iteratively build a tree."""<br /><br /> class CollectError(Exception):<br /> """An error during collection, contains a custom message."""<br /><br /> def collect(self) -> Iterable[Union["Item", "Collector"]]:<br /> """Return a list of children (items and collectors) for this<br /> collection node."""<br /> raise NotImplementedError("abstract")
由于 collect() 方法
为空,它直接引发异常 NotImplementError(“abstract”),因此我们需要覆盖 collect() 方法
YamlFile 重写 collect()。
对应于 YamlFile 类,继承 ytest。文件,它覆盖 collect() 方法
class YamlFile(pytest.File):<br /><br /> def collect(self):<br /> """返回读取内容的Iterable 可迭代对象"""<br /> raw = yaml.safe_load(self.fspath.open(encoding='utf-8'))<br /> print(raw)<br /> # raw 是读取 yml 数据的内容<br /> yield pytest.Item.from_parent(self, name=raw.get('name'), values=raw)
再次运行 pytest
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _<br /><br />item = <br /><br /> def pytest_runtest_call(item: Item) -> None:<br /> _update_current_test_var(item, "call")<br /> try:<br /> del sys.last_type<br /> del sys.last_value<br /> del sys.last_traceback<br /> except AttributeError:<br /> pass<br /> try:<br />> item.runtest()<br /><br />venv\lib\site-packages\_pytest\runner.py:167:<br />_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _<br /><br />self = <br /><br /> def runtest(self) -> None:<br /> """Run the test case for this item.<br /><br /> Must be implemented by subclasses.<br /><br /> .. seealso:: :ref:`non-python tests`<br /> """<br />> raise NotImplementedError("runtest must be implemented by Item subclass")<br />E NotImplementedError: runtest must be implemented by Item subclass<br /><br />venv\lib\site-packages\_pytest\nodes.py:733: NotImplementedError
这次发生的错误在 runner.py 文件中报告,并且通过执行 runtest() 方法 NotImplementError(“runtest 必须由 Item 子类实现”)引发的异常
" />
)。
看到这里,就意味着用例 Item 已经生成,并且在执行时,没有定义执行 yaml 文件的方法,因此报告了一个错误
所以我在 nodes.py 中找到了 Item(Node) 类
class Item(Node):<br /> """A basic test invocation item.<br /><br /> Note that for a single function there might be multiple test invocation items.<br /> """<br /><br /> def runtest(self) -> None:<br /> """Run the test case for this item.<br /><br /> Must be implemented by subclasses.<br /><br /> .. seealso:: :ref:`non-python tests`<br /> """<br /> raise NotImplementedError("runtest must be implemented by Item subclass")
接下来,您需要重写 Item 中的运行测试以执行用例
重写项目的运行测试
您最终看到执行yaml文件的简短版本的界面用例 conftest.py 如下
import pytest<br />import requests<br />import yaml<br />from pathlib import Path<br /><br />def pytest_collect_file(file_path: Path, parent):<br /> # 获取文件.yml 文件,匹配规则<br /> if file_path.suffix == ".yml" and file_path.name.startswith("test"):<br /> return YamlFile.from_parent(parent, path=file_path)<br /><br />class YamlFile(pytest.File):<br /><br /> def collect(self):<br /> """返回读取内容的Iterable 可迭代对象"""<br /> raw = yaml.safe_load(self.fspath.open(encoding='utf-8'))<br /> print(raw)<br /> # raw 是读取 yml 数据的内容<br /> yield YamlTest.from_parent(self, name=raw.get('name'), values=raw)<br /><br />class YamlTest(pytest.Item):<br /> def __init__(self, name, parent, values):<br /> super(YamlTest, self).__init__(name, parent)<br /> self.name = name<br /> self.values = values<br /> self.s = requests.session()<br /><br /> def runtest(self) -> None:<br /> """运行用例"""<br /> request_data = self.values["request"]<br /> response = self.s.request(**request_data)<br /> print("\n", response.text)
输入pytest,您可以看到yaml文件作为用例执行