规则引擎
输入数据
从概念上来说,RuleEngine 处理的是一个元组的数据流(元组包含 message, timestamp, topic, message type),当编写规则时,使用者可以通过相应变量来访问输入数据(msg, ts, topic, msgtype)。
根据处理的文件类型,coScene agent 将会根据下列图表填充变量:
Bag file (ROS, Mcap,etc) | Log file | |
---|---|---|
msg: any | Message 数据 | 由log文件中的一行信息封装成的 Foxglove.Log |
ts: float | 时间戳 | 从单行log中解析出的时间戳 |
topic: str | Topic 名称 | log文件名 |
msgtype:str | 消息类型 | Foxglove.Log |
注:
- 时间戳为自 1970/01/01 00:00:00 经历的秒数,float 类型
- 时间戳的解析将尽力完成,相关信息见支持的时间戳格式
msg
的结构取决于输入数据的类型,比如一个 rosgraph_msgs/Log 类型的msg
,它的结构就会如定义所示,下面是一个例子:这样比如想要获取msg: {
header: {
seq: 1,
stamp: {
secs: 12345678,
nsecs: 0
},
frame_id: 'demo_frame'
},
level: 2,
name: 'demo_node',
msg: 'Hello World',
file: 'demo file',
function: 'demo function',
line: 10,
topics: ['/demo_topic'],
}msg
中的msg
字段,可以使用msg.msg
,想要获取msg
中的frame_id
字段,可以使用msg.header.frame_id
,以此类推。
Log 数据
为了方便读取日志数据,我们为以下 log 数据类型提供了额外的提取器 (log
、log_level
)
- Protobuf or JSON:
foxglove.Log
- ROS:
foxglove_msgs/Log
- ROS:
rosgraph_msgs/Log
log
: 过滤支持的消息类型,并且返回 log 内容
log == 'Error occurred!' # 当一行日志信息为 'Error occurred!'时触发
log_level
: 过滤支持的消息类型,并返回 log 的等级
用户可以使用 log_level 枚举不同的日志等级
- INFO
- WARN
- ERROR
- FATAL
log_level == LogLevel.FATAL # 每当出现 FATAL 日志时触发
运算符
==
!=
>
>=
<
<=
a and b
a or b
not a
a in b
序列匹配器
-
sustained
sustained(context, condition, duration)
当给定
condition
为真的持续时间超过duration
时,条件会被触发。 (context
是用于评估condition
的消息过滤器。)# 下面的语法会监测所有 topic 为 ‘/velocity’的消息,
# 如果其 liner.x 的值大于 20 且持续时间大于 10 s
# 该条件将会被触发
sustained(
topic == '/velocity',
msg.linear.x > 20,
10
)
-
repeated
repeated(condition, times, duration)
当给定
condition
在给定的duration
内被触发time
次时,条件会被触发。# 当日志中包含‘Error 12345 happened’,且在 60 秒内出现了 5 次,条件被触发
repeated(
'Error 12345 happened' in log,
5,
60
)
-
sequential
sequential(c1, c2, ..., duration)
sequential 可以设定任意数量的条件,如果在以秒为单位的
duration
内,所有的条件都按顺序触发,条件会被触发。如果没有设置
duration
,则没有时间限制# 初始化全部执行,且时间小于 10 秒,则被触发
sequential(
'Initialization start' in log,
'All modules loaded' in log,
'Localization initialized' in log,
'Initialization complete' in log,
duration=10
)
# 任务失败则被触发,无时间限制
sequential(
'Task started' in log,
'Task failed' in log
)
-
throttle
throttle(condition, duration)
condition
在以秒为单位的duration
内触发不多于 1 次,如果duration
内多次被触发,仅第一次满足condition
的数据会触发该条件throttle(
"123" in log,
2
)
-
debounce
debounce(condition, duration)
condition
被满足时会被触发。在这个语法中包含了一个定时器,如果在以秒为单位的duration
内condition
再次被触发,debounce
不会被触发,且定时器会被重置# 当监测到‘Error 12345’时会被触发,‘Error 12345’在 2 秒内多次触发时,
# 仅第一次监测到的‘Error 12345’会被触发
debounce(
'Error 12345' in log,
2
)
-
any_order
any_order(condition1, condition2, ..., reset_time)
所有 condition 被触发,无视触发数顺序
如果
reset_time
被设置,如果在第一个conditioni
被触发后reset_time
时间内没有触发所有的conditioni
,则状态被重置。any_order("123" in log, "789" in log, "456" in log)
any_order("123" in log, "789" in log, "456" in log, reset_time=10)
-
timeout
timeout(condition1, condition2, ..., duration)
所有的
condition1
触发后,剩余的condition
未能在duration
时间段内被依次触发,则该条件触发# 如果所有的condition没有在duration时间段内被触发,则该条件被触发
timeout("123" in log, "456" in log, duration=1)
值传递
get_value(key)
和 set_value(key, value)
两个方法用于在不同的条件之间进行值传递。
# Triggers if a request completes within the given time.
# Requests are matched by id
sequential(
set_value('req_id', regex(log, 'Send request (\\d+)').group(1)),
regex(log, 'Request (\\d+) completed').group(1) == get_value('req_id'),
duration=10
)
功能函数
-
concat(...)
: 字符串连接concat('a', 'b', 123) # 'ab123'
-
regex(value, pattern)
: 监测value
与pattern
是否匹配如果成功匹配,则返回 Python
Match
对象regex('Current speed is 20', 'speed is (\\d+)').group(1) # 20