星原网关
星原网关完全按标准化流程进行,即可使用默认 数据对接脚本(物联平台生成)
jsonQuery: [params,{"device_id":deviceId},{"timestamp":time}].@join
时间格式: unix_ms
https://www.yuque.com/u22138663/bfte2b/lbntzt (opens in a new tab)
领祺网关
jsonQuery: [{"id":id},{"PublicAddr":data.@keys.0},{"data":data.*},{"type":type},{"sn":sn},{"timestamp":time},{"tagid":tagid},{"value":value},{"result":result}].@join
时间格式: 2006-01-02 15:04:05
特殊配置
网关模板的设备类key要改成LingQiGateway,部分老项目不能修改的可以联系开发
网关配置
网关子设备: {设备sn}@{采集上传公共地址}
##设备配置
具体网关设备->技术参数:添加 参数名:sn 参数值: 【真实设备sn】
具体网关子设备->技术参数:添加 参数名:PublicAddr 参数值: 【真实采集上传公共地址】
脚本
输入 变更
[[inputs.mqtt_consumer]]
name_override = "${TEMPLATE_NAME}${DEVICE_TYPE}"
alias = "${TEMPLATE_NAME}${DEVICE_TYPE}"
servers = ["${EMQX_BROKER_SERVER}"]
topics = [
${TEMPLATE_UP_TOPIC}
]
client_id = "{\"userId\":${EMQX_BORKER_USER_ID},\"telegraf\":\"${TENANT_ID}@${TEMPLATE_NAME}${DEVICE_TYPE}@input\"}"
username = "${EMQX_BROKER_USERNAME}"
password = "${EMQX_BROKER_PASSWORD}"
persistent_session = true
data_format = "json"
json_query = "${TEMPLATE_JSON_QUERY}"
tag_keys = ["device_id","PublicAddr","sn","type","tagid","value","id"]
json_string_fields = ["data*","result",${STRING_FIELD}]
json_time_key = "${TIMESTAMP}"
json_time_format= "${TIMESTAMP_FORMAT}"
自定义1 新增
[[processors.starlark]]
namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@LinQistar"
order = ${order@100}
source = '''
def apply(metric):
metric.time=metric.time-28800000000000
if 'type' in metric.tags.keys() and 'sn' in metric.tags.keys() and 'PublicAddr' in metric.tags.keys() and 'real' == metric.tags.get("type"):
device_key=metric.tags.get("sn")+'@'+metric.tags.get("PublicAddr")
metric.tags.clear()
metric.tags.setdefault("device_key",device_key)
newFields=dict()
for data in metric.fields.keys():
datalist=data.split('_')
if 'id' == datalist[2] and len(datalist) >= 3:
value= datalist[0]+'_'+datalist[1]+'_'+'value'
newvalue=float(metric.fields.get(value))
newFields.setdefault(metric.fields.get(data),newvalue)
metric.fields.clear()
for data in newFields.keys():
metric.fields.setdefault(data,newFields.get(data))
elif 'type' in metric.tags.keys() and 'sn' in metric.tags.keys() and 'topic' in metric.tags.keys() and 'writeack' == metric.tags.get("type"):
topics=metric.tags.get("topic").split("up/gateway/")
if len(topics) == 2:
metric.name='up/callback/gateway/'+topics[1]
return metric
'''
自定义2 新增
[[processors.enum]]
namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@marking"
order = ${order@110}
[[processors.enum.mapping]]
tag = "device_key"
dest ="device_id"
[processors.enum.mapping.value_mappings]
${DEVICE_NAME_TO_DEVICE_ID_MAP}
打标 不变
[[processors.enum]]
namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@marking"
order = ${order@200}
[[processors.enum.mapping]]
tag = "device_id"
dest ="device_name"
[processors.enum.mapping.value_mappings]
${DEVICE_ID_TO_DEVICE_NAME_MAP}
[[processors.enum.mapping]]
tag = "device_id"
dest ="organzation_id"
[processors.enum.mapping.value_mappings]
${DEVICE_ID_TO_ORGANZATION_ID_MAP}
[[processors.enum.mapping]]
tag = "device_id"
dest ="space_id"
[processors.enum.mapping.value_mappings]
${DEVICE_ID_TO_SPACE_ID_MAP}
[[processors.enum.mapping]]
tag = "device_id"
dest ="template_id"
[processors.enum.mapping.value_mappings]
${DEVICE_ID_TO_TEMPLATE_ID_MAP}
[[processors.enum.mapping]]
tag = "device_id"
dest ="tenant_id"
[processors.enum.mapping.value_mappings]
${DEVICE_ID_TO_TENANT_ID_MAP}
[[processors.enum.mapping]]
tag = "device_id"
dest ="template_name"
[processors.enum.mapping.value_mappings]
${DEVICE_ID_TO_TEMPLATE_NAME_MAP}
转发 不变 20230320之后弃用
[[processors.starlark]]
namepass=["${TEMPLATE_NAME}${DEVICE_TYPE}"]
alias = "${TEMPLATE_NAME}@gateway@forward"
order = ${order@300}
source = '''
def apply(metric):
if 'device_id' in metric.tags.keys() and 'template_name' in metric.tags.keys() and 'device_name' in metric.tags.keys() and 'tenant_id' in metric.tags.keys():
metric.name = 'up/device/'+metric.tags["tenant_id"]+'/'+metric.tags["template_name"] + '/' + metric.tags["device_name"]
device_id=metric.tags.get("device_id")
metric.tags.clear()
metric.tags.setdefault("device_id",device_id)
return metric
'''
输出 新增
[[outputs.mqtt]]
flush_interval="1s"
namepass=["up/callback/*"]
alias = "output@mqtt@callback"
servers = ["${EMQX_BROKER_OUT_SERVER}"]
topic_prefix = ""
client_id = "{\"userId\":${EMQX_BORKER_USER_ID},\"telegraf\":\"${TENANT_ID}@callbacksocket@output\"}"
username = "${EMQX_BROKER_USERNAME}"
password = "${EMQX_BROKER_PASSWORD}"
batch = false
data_format = "json"
json_timestamp_units = "1ms"
中移物联网kafka对接
jsonQuery: [data,{"productId":productId},{"deviceName":deviceName},{"messageType":messageType},{"notifyType":notifyType}].@join
时间格式: unix_ms
子设备name配置
子设备: {productId}@{deviceName}
脚本
输入 变更
brokers kafka地址 topics kafka topic sasl_username 用户名 sasl_password 密码 sasl_mechanism 验证方式 client_id 客户端id consumer_group 消费组 json_string_fields 根据实际情况配置,所有string/bool类型 属性 加前缀”params_”以及后缀”_value”
[[inputs.kafka_consumer]]
name_override = "${TEMPLATE_NAME}${DEVICE_TYPE}"
alias = "${TEMPLATE_NAME}${DEVICE_TYPE}"
brokers = ["192.168.110.18:9092","192.168.110.19:9092","192.168.110.20:9092"]
topics = ["uXno_yjdl"]
client_id = "yjdl-test"
sasl_username = "fcuMpfvm"
sasl_password = "azOAzBteDOGOHaVw"
sasl_mechanism = "SCRAM-SHA-512"
consumer_group= "yjdl-test"
offset = "newest"
max_message_len = 1000000
data_format = "json"
json_query = "${TEMPLATE_JSON_QUERY}"
tag_keys = ["productId","deviceName","messageType","notifyType"]
json_string_fields = ["params_meterCode_value","params_venderCode_value","params_hwVersion_value","params_swVersion_value","params_currentTime_value","params_lowVoltageAlarm_value","params_measureErrorAlarm_value","params_supplyTempAlarm_value","params_returnTempAlarm_value","params_setReportTime_value","params_storeErrorAlarm_value","params_userID_value","params_alarmState_value","params_controlMode_value","params_deviceCode_value","params_deviceName_value","params_deviceState_value","params_sampleTime_value","params_remoteControl_value","params_valueState_value","params_batchControl_value","params_comStat_value","params_deviceNum_value"]
json_timezone = "Asia/Shanghai"
自定义 新增
[[processors.starlark]]
namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@oneNetInput"
order = ${order@100}
source = '''
def apply(metric):
if 'messageType' in metric.tags.keys() and 'notifyType' in metric.tags.keys() and 'productId' in metric.tags.keys() and 'deviceName' in metric.tags.keys() and 'notify' == metric.tags.get("messageType") and 'property' == metric.tags.get("notifyType"):
device_key=metric.tags.get("productId")+'@'+metric.tags.get("deviceName")
metric.tags.clear()
metric.tags.setdefault("device_key",device_key)
newFields=dict()
for data in metric.fields.keys():
datalist=data.split('_')
datalen=len(datalist)
if datalen >= 3 and 'value' == datalist[datalen-1] and 'params' == datalist[0]:
newkey=""
for i in range(datalen - 2):
newkey += datalist[i+1] + "_"
newkey = newkey[:-1]
newvalue=metric.fields.get(data)
newFields.setdefault(newkey,newvalue)
metric.fields.clear()
for data in newFields.keys():
metric.fields.setdefault(data,newFields.get(data))
else:
metric.name=""
metric.tags.clear()
metric.fields.clear()
return metric
'''
'''
自定义 新增
[[processors.enum]]
namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@marking"
order = ${order@110}
[[processors.enum.mapping]]
tag = "device_key"
dest ="device_id"
[processors.enum.mapping.value_mappings]
${DEVICE_NAME_TO_DEVICE_ID_MAP}
领祺网关2(20231018更新)
jsonQuery: [{"id":id},{"data":data*},{"type":type},{"sn":sn},{"timestamp":time},{"tagid":tagid},{"value":value},{"result":result}].@join
时间格式: 2006-01-02 15:04:05
比较领祺网关
采集修复 单网关下多公共地址设备 无法使用同一转发通道上传问题 从领祺网关脚本更新:
- 更新 网关设备类->编辑->jsonQuery
- 更新 网关设备类->详情->脚本->网关脚本-> 替换 “自定义1 新增” 脚本
同时添加 新旧 领祺网关
脚本”输出 新增” 不要重复添加,租户内所有设备类 只能存在一份
网关子配置
网关子设备: {设备sn}@{采集上传公共地址}
##设备配置
仅采集无需配置技术参数,控制配置技术参数参考控制文档
脚本
输入 变更
[[inputs.mqtt_consumer]]
name_override = "${TEMPLATE_NAME}${DEVICE_TYPE}"
alias = "${TEMPLATE_NAME}${DEVICE_TYPE}"
servers = ["${EMQX_BROKER_SERVER}"]
topics = [
${TEMPLATE_UP_TOPIC}
]
client_id = "{\"userId\":${EMQX_BORKER_USER_ID},\"telegraf\":\"${TENANT_ID}@${TEMPLATE_NAME}${DEVICE_TYPE}@input\"}"
username = "${EMQX_BROKER_USERNAME}"
password = "${EMQX_BROKER_PASSWORD}"
persistent_session = true
data_format = "json"
json_query = "${TEMPLATE_JSON_QUERY}"
tag_keys = ["device_id","PublicAddr","sn","type","tagid","value","id"]
json_string_fields = ["data*","result",${STRING_FIELD}]
json_time_key = "${TIMESTAMP}"
json_time_format= "${TIMESTAMP_FORMAT}"
自定义1 新增
[[processors.starlark]]
namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@LinQistar"
order = ${order@100}
source = '''
def apply(metric):
metric.time=metric.time-28800000000000
if 'type' in metric.tags.keys() and 'sn' in metric.tags.keys() and 'real' == metric.tags.get("type"):
deviceMap=dict()
metrics=[]
sn=metric.tags.get("sn")
for data in metric.fields.keys():
datalist=data.split('_')
if len(datalist) >= 4 and 'data'==datalist[0] and 'id' == datalist[3]:
publicAddr=datalist[1]
valueKey= datalist[0]+'_'+datalist[1]+'_'+datalist[2]+'_'+'value'
newvalue=float(metric.fields.get(valueKey))
if publicAddr not in deviceMap:
deviceMap.setdefault(publicAddr,dict())
deviceFields=deviceMap.get(publicAddr)
deviceFields.setdefault(metric.fields.get(data),newvalue)
deviceMap.setdefault(publicAddr,deviceFields)
if len(deviceMap) > 0:
for device in deviceMap.keys():
device_key=sn+'@'+device
newMetric=Metric(metric.name)
newMetric.time=metric.time
newMetric.tags.setdefault("device_key",device_key)
deviceNewFields=deviceMap.get(device)
for data in deviceNewFields.keys():
newMetric.fields.setdefault(data,deviceNewFields.get(data))
metrics.append(newMetric)
return metrics
elif 'type' in metric.tags.keys() and 'sn' in metric.tags.keys() and 'topic' in metric.tags.keys() and 'writeack' == metric.tags.get("type"):
topics=metric.tags.get("topic").split("up/gateway/")
if len(topics) == 2:
metric.name='up/callback/gateway/'+topics[1]
return metric
'''
自定义2 新增
[[processors.enum]]
namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@marking"
order = ${order@110}
[[processors.enum.mapping]]
tag = "device_key"
dest ="device_id"
[processors.enum.mapping.value_mappings]
${DEVICE_NAME_TO_DEVICE_ID_MAP}
打标 不变
[[processors.enum]]
namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@marking"
order = ${order@200}
[[processors.enum.mapping]]
tag = "device_id"
dest ="device_name"
[processors.enum.mapping.value_mappings]
${DEVICE_ID_TO_DEVICE_NAME_MAP}
[[processors.enum.mapping]]
tag = "device_id"
dest ="organzation_id"
[processors.enum.mapping.value_mappings]
${DEVICE_ID_TO_ORGANZATION_ID_MAP}
[[processors.enum.mapping]]
tag = "device_id"
dest ="space_id"
[processors.enum.mapping.value_mappings]
${DEVICE_ID_TO_SPACE_ID_MAP}
[[processors.enum.mapping]]
tag = "device_id"
dest ="template_id"
[processors.enum.mapping.value_mappings]
${DEVICE_ID_TO_TEMPLATE_ID_MAP}
[[processors.enum.mapping]]
tag = "device_id"
dest ="tenant_id"
[processors.enum.mapping.value_mappings]
${DEVICE_ID_TO_TENANT_ID_MAP}
[[processors.enum.mapping]]
tag = "device_id"
dest ="template_name"
[processors.enum.mapping.value_mappings]
${DEVICE_ID_TO_TEMPLATE_NAME_MAP}
转发 不变
[[processors.starlark]]
namepass=["${TEMPLATE_NAME}${DEVICE_TYPE}"]
alias = "${TEMPLATE_NAME}@gateway@forward"
order = ${order@300}
source = '''
def apply(metric):
if 'device_id' in metric.tags.keys() and 'template_name' in metric.tags.keys() and 'device_name' in metric.tags.keys() and 'tenant_id' in metric.tags.keys():
metric.name = 'up/device/'+metric.tags["tenant_id"]+'/'+metric.tags["template_name"] + '/' + metric.tags["device_name"]
device_id=metric.tags.get("device_id")
metric.tags.clear()
metric.tags.setdefault("device_id",device_id)
return metric
'''
输出 新增 20230320之后弃用
[[outputs.mqtt]]
flush_interval="1s"
namepass=["up/callback/*"]
alias = "output@mqtt@callback"
servers = ["${EMQX_BROKER_OUT_SERVER}"]
topic_prefix = ""
client_id = "{\"userId\":${EMQX_BORKER_USER_ID},\"telegraf\":\"${TENANT_ID}@callbacksocket@output\"}"
username = "${EMQX_BROKER_USERNAME}"
password = "${EMQX_BROKER_PASSWORD}"
batch = false
data_format = "json"
json_timestamp_units = "1ms"