自选网关脚本配置

星原网关

星原网关完全按标准化流程进行,即可使用默认 数据对接脚本(物联平台生成)

    jsonQuery: [params,{"device_id":deviceId},{"timestamp":time}].@join
    时间格式: unix_ms

https://www.yuque.com/u22138663/bfte2b/lbntzt (opens in a new tab)

领祺网关

    jsonQuery: [{"id":id},{"PublicAddr":data.@keys.0},{"data":data.*},{"type":type},{"sn":sn},{"timestamp":time},{"tagid":tagid},{"value":value},{"result":result}].@join
    时间格式: 2006-01-02 15:04:05

特殊配置

网关模板的设备类key要改成LingQiGateway,部分老项目不能修改的可以联系开发

网关配置

网关子设备: {设备sn}@{采集上传公共地址}

##设备配置

具体网关设备->技术参数:添加 参数名:sn  参数值: 【真实设备sn】
具体网关子设备->技术参数:添加 参数名:PublicAddr  参数值: 【真实采集上传公共地址】

脚本


输入 变更

[[inputs.mqtt_consumer]]
    name_override = "${TEMPLATE_NAME}${DEVICE_TYPE}"
    alias = "${TEMPLATE_NAME}${DEVICE_TYPE}"
    servers = ["${EMQX_BROKER_SERVER}"]
    topics = [
    ${TEMPLATE_UP_TOPIC}
    ]
    client_id = "{\"userId\":${EMQX_BORKER_USER_ID},\"telegraf\":\"${TENANT_ID}@${TEMPLATE_NAME}${DEVICE_TYPE}@input\"}"
    username = "${EMQX_BROKER_USERNAME}"
    password = "${EMQX_BROKER_PASSWORD}"
    persistent_session = true
    data_format = "json"
    json_query = "${TEMPLATE_JSON_QUERY}"
    tag_keys = ["device_id","PublicAddr","sn","type","tagid","value","id"]
    json_string_fields = ["data*","result",${STRING_FIELD}]
    json_time_key = "${TIMESTAMP}"
    json_time_format= "${TIMESTAMP_FORMAT}"

自定义1 新增

[[processors.starlark]]
    namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
    alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@LinQistar"
    order = ${order@100}
    source = '''
def apply(metric):
    metric.time=metric.time-28800000000000
    if 'type' in metric.tags.keys() and 'sn' in metric.tags.keys() and 'PublicAddr' in metric.tags.keys() and 'real' == metric.tags.get("type"):
        device_key=metric.tags.get("sn")+'@'+metric.tags.get("PublicAddr")
        metric.tags.clear()
        metric.tags.setdefault("device_key",device_key)
        newFields=dict()
        for data in metric.fields.keys():
            datalist=data.split('_')
            if 'id' == datalist[2] and  len(datalist) >= 3:
                value= datalist[0]+'_'+datalist[1]+'_'+'value'
                newvalue=float(metric.fields.get(value))
                newFields.setdefault(metric.fields.get(data),newvalue)
        metric.fields.clear()
        for data in newFields.keys():
            metric.fields.setdefault(data,newFields.get(data))
    elif 'type' in metric.tags.keys() and 'sn' in metric.tags.keys() and 'topic' in metric.tags.keys() and 'writeack' == metric.tags.get("type"):
        topics=metric.tags.get("topic").split("up/gateway/")
        if len(topics) == 2:
            metric.name='up/callback/gateway/'+topics[1]
    return metric
'''

自定义2 新增

[[processors.enum]]
    namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
    alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@marking"
    order = ${order@110}
    [[processors.enum.mapping]]
        tag = "device_key"
        dest ="device_id"
        [processors.enum.mapping.value_mappings]
            ${DEVICE_NAME_TO_DEVICE_ID_MAP}

打标 不变

[[processors.enum]]
    namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
    alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@marking"
    order = ${order@200}
    [[processors.enum.mapping]]
        tag = "device_id"
        dest ="device_name"
        [processors.enum.mapping.value_mappings]
            ${DEVICE_ID_TO_DEVICE_NAME_MAP}
    [[processors.enum.mapping]]
        tag = "device_id"
        dest ="organzation_id"
        [processors.enum.mapping.value_mappings]
            ${DEVICE_ID_TO_ORGANZATION_ID_MAP}
    [[processors.enum.mapping]]
        tag = "device_id"
        dest ="space_id"
        [processors.enum.mapping.value_mappings]
            ${DEVICE_ID_TO_SPACE_ID_MAP}
    [[processors.enum.mapping]]
        tag = "device_id"
        dest ="template_id"
        [processors.enum.mapping.value_mappings]
            ${DEVICE_ID_TO_TEMPLATE_ID_MAP}
    [[processors.enum.mapping]]
        tag = "device_id"
        dest ="tenant_id"
        [processors.enum.mapping.value_mappings]
            ${DEVICE_ID_TO_TENANT_ID_MAP}
    [[processors.enum.mapping]]
        tag = "device_id"
        dest ="template_name"
        [processors.enum.mapping.value_mappings]
            ${DEVICE_ID_TO_TEMPLATE_NAME_MAP}

转发 不变 20230320之后弃用

[[processors.starlark]]
    namepass=["${TEMPLATE_NAME}${DEVICE_TYPE}"]
    alias = "${TEMPLATE_NAME}@gateway@forward"
    order = ${order@300}
    source = '''
def apply(metric):
    if 'device_id' in metric.tags.keys() and 'template_name' in metric.tags.keys() and 'device_name' in metric.tags.keys() and 'tenant_id' in metric.tags.keys():
        metric.name = 'up/device/'+metric.tags["tenant_id"]+'/'+metric.tags["template_name"] + '/' + metric.tags["device_name"]
        device_id=metric.tags.get("device_id")
        metric.tags.clear()
        metric.tags.setdefault("device_id",device_id)
    return metric
'''

输出 新增

[[outputs.mqtt]]
    flush_interval="1s"
    namepass=["up/callback/*"]
    alias = "output@mqtt@callback"
    servers = ["${EMQX_BROKER_OUT_SERVER}"]
    topic_prefix = ""
    client_id = "{\"userId\":${EMQX_BORKER_USER_ID},\"telegraf\":\"${TENANT_ID}@callbacksocket@output\"}"
    username = "${EMQX_BROKER_USERNAME}"
    password = "${EMQX_BROKER_PASSWORD}"
    batch = false
    data_format = "json"
    json_timestamp_units = "1ms"

中移物联网kafka对接

    jsonQuery: [data,{"productId":productId},{"deviceName":deviceName},{"messageType":messageType},{"notifyType":notifyType}].@join
    时间格式: unix_ms

子设备name配置

    子设备: {productId}@{deviceName}

脚本

输入 变更

brokers kafka地址 topics kafka topic sasl_username 用户名 sasl_password 密码 sasl_mechanism 验证方式 client_id 客户端id consumer_group 消费组 json_string_fields 根据实际情况配置,所有string/bool类型 属性 加前缀”params_”以及后缀”_value”

[[inputs.kafka_consumer]]
    name_override = "${TEMPLATE_NAME}${DEVICE_TYPE}"
    alias = "${TEMPLATE_NAME}${DEVICE_TYPE}"
    brokers = ["192.168.110.18:9092","192.168.110.19:9092","192.168.110.20:9092"]
    topics = ["uXno_yjdl"]
    client_id = "yjdl-test"
    sasl_username = "fcuMpfvm"
    sasl_password = "azOAzBteDOGOHaVw"
    sasl_mechanism = "SCRAM-SHA-512"
    consumer_group= "yjdl-test"
    offset = "newest"
    max_message_len = 1000000
    data_format = "json"
    json_query = "${TEMPLATE_JSON_QUERY}"
    tag_keys = ["productId","deviceName","messageType","notifyType"]
    json_string_fields = ["params_meterCode_value","params_venderCode_value","params_hwVersion_value","params_swVersion_value","params_currentTime_value","params_lowVoltageAlarm_value","params_measureErrorAlarm_value","params_supplyTempAlarm_value","params_returnTempAlarm_value","params_setReportTime_value","params_storeErrorAlarm_value","params_userID_value","params_alarmState_value","params_controlMode_value","params_deviceCode_value","params_deviceName_value","params_deviceState_value","params_sampleTime_value","params_remoteControl_value","params_valueState_value","params_batchControl_value","params_comStat_value","params_deviceNum_value"]
    json_timezone = "Asia/Shanghai"

自定义 新增

[[processors.starlark]]
    namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
    alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@oneNetInput"
    order = ${order@100}
    source = '''
def apply(metric):
    if 'messageType' in metric.tags.keys() and 'notifyType' in metric.tags.keys() and 'productId' in metric.tags.keys() and 'deviceName' in metric.tags.keys() and 'notify' == metric.tags.get("messageType") and 'property' == metric.tags.get("notifyType"):
        device_key=metric.tags.get("productId")+'@'+metric.tags.get("deviceName")
        metric.tags.clear()
        metric.tags.setdefault("device_key",device_key)
        newFields=dict()
        for data in metric.fields.keys():
            datalist=data.split('_')
            datalen=len(datalist)
            if datalen >= 3 and 'value' == datalist[datalen-1] and 'params' == datalist[0]:
                newkey=""
                for i in range(datalen - 2):
                   newkey += datalist[i+1] + "_"
                newkey = newkey[:-1]
                newvalue=metric.fields.get(data)
                newFields.setdefault(newkey,newvalue)
        metric.fields.clear()
        for data in newFields.keys():
            metric.fields.setdefault(data,newFields.get(data))
    else:
        metric.name=""
        metric.tags.clear()
        metric.fields.clear()
    return metric
'''
'''

自定义 新增

[[processors.enum]]
    namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
    alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@marking"
    order = ${order@110}
    [[processors.enum.mapping]]
        tag = "device_key"
        dest ="device_id"
        [processors.enum.mapping.value_mappings]
            ${DEVICE_NAME_TO_DEVICE_ID_MAP}

领祺网关2(20231018更新)

    jsonQuery: [{"id":id},{"data":data*},{"type":type},{"sn":sn},{"timestamp":time},{"tagid":tagid},{"value":value},{"result":result}].@join
    时间格式: 2006-01-02 15:04:05

比较领祺网关

采集修复 单网关下多公共地址设备 无法使用同一转发通道上传问题 从领祺网关脚本更新:

  • 更新 网关设备类->编辑->jsonQuery
  • 更新 网关设备类->详情->脚本->网关脚本-> 替换 “自定义1 新增” 脚本

同时添加 新旧 领祺网关

脚本”输出 新增” 不要重复添加,租户内所有设备类 只能存在一份

网关子配置

    网关子设备: {设备sn}@{采集上传公共地址}

##设备配置

    仅采集无需配置技术参数,控制配置技术参数参考控制文档

脚本

输入 变更

[[inputs.mqtt_consumer]]
    name_override = "${TEMPLATE_NAME}${DEVICE_TYPE}"
    alias = "${TEMPLATE_NAME}${DEVICE_TYPE}"
    servers = ["${EMQX_BROKER_SERVER}"]
    topics = [
    ${TEMPLATE_UP_TOPIC}
    ]
    client_id = "{\"userId\":${EMQX_BORKER_USER_ID},\"telegraf\":\"${TENANT_ID}@${TEMPLATE_NAME}${DEVICE_TYPE}@input\"}"
    username = "${EMQX_BROKER_USERNAME}"
    password = "${EMQX_BROKER_PASSWORD}"
    persistent_session = true
    data_format = "json"
    json_query = "${TEMPLATE_JSON_QUERY}"
    tag_keys = ["device_id","PublicAddr","sn","type","tagid","value","id"]
    json_string_fields = ["data*","result",${STRING_FIELD}]
    json_time_key = "${TIMESTAMP}"
    json_time_format= "${TIMESTAMP_FORMAT}"

自定义1 新增

[[processors.starlark]]
    namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
    alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@LinQistar"
    order = ${order@100}
    source = '''
def apply(metric):
    metric.time=metric.time-28800000000000
    if 'type' in metric.tags.keys() and 'sn' in metric.tags.keys() and 'real' == metric.tags.get("type"):
        deviceMap=dict()
        metrics=[]
        sn=metric.tags.get("sn")
        for data in metric.fields.keys():
            datalist=data.split('_')
            if len(datalist) >= 4 and 'data'==datalist[0] and 'id' == datalist[3]:
                publicAddr=datalist[1]
                valueKey= datalist[0]+'_'+datalist[1]+'_'+datalist[2]+'_'+'value'
                newvalue=float(metric.fields.get(valueKey))
                if publicAddr not in deviceMap:
                    deviceMap.setdefault(publicAddr,dict())
                deviceFields=deviceMap.get(publicAddr)
                deviceFields.setdefault(metric.fields.get(data),newvalue)
                deviceMap.setdefault(publicAddr,deviceFields)
        if len(deviceMap) > 0:
            for device in deviceMap.keys():
                device_key=sn+'@'+device
                newMetric=Metric(metric.name)
                newMetric.time=metric.time
                newMetric.tags.setdefault("device_key",device_key)
                deviceNewFields=deviceMap.get(device)
                for data in deviceNewFields.keys():
                    newMetric.fields.setdefault(data,deviceNewFields.get(data))
                metrics.append(newMetric)
        return metrics
    elif 'type' in metric.tags.keys() and 'sn' in metric.tags.keys() and 'topic' in metric.tags.keys() and 'writeack' == metric.tags.get("type"):
        topics=metric.tags.get("topic").split("up/gateway/")
        if len(topics) == 2:
            metric.name='up/callback/gateway/'+topics[1]
    return metric
'''

自定义2 新增

[[processors.enum]]
    namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
    alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@marking"
    order = ${order@110}
    [[processors.enum.mapping]]
        tag = "device_key"
        dest ="device_id"
        [processors.enum.mapping.value_mappings]
            ${DEVICE_NAME_TO_DEVICE_ID_MAP}

打标 不变

[[processors.enum]]
    namepass= ["${TEMPLATE_NAME}${DEVICE_TYPE}"]
    alias = "${TEMPLATE_NAME}${DEVICE_TYPE}@marking"
    order = ${order@200}
    [[processors.enum.mapping]]
        tag = "device_id"
        dest ="device_name"
        [processors.enum.mapping.value_mappings]
            ${DEVICE_ID_TO_DEVICE_NAME_MAP}
    [[processors.enum.mapping]]
        tag = "device_id"
        dest ="organzation_id"
        [processors.enum.mapping.value_mappings]
            ${DEVICE_ID_TO_ORGANZATION_ID_MAP}
    [[processors.enum.mapping]]
        tag = "device_id"
        dest ="space_id"
        [processors.enum.mapping.value_mappings]
            ${DEVICE_ID_TO_SPACE_ID_MAP}
    [[processors.enum.mapping]]
        tag = "device_id"
        dest ="template_id"
        [processors.enum.mapping.value_mappings]
            ${DEVICE_ID_TO_TEMPLATE_ID_MAP}
    [[processors.enum.mapping]]
        tag = "device_id"
        dest ="tenant_id"
        [processors.enum.mapping.value_mappings]
            ${DEVICE_ID_TO_TENANT_ID_MAP}
    [[processors.enum.mapping]]
        tag = "device_id"
        dest ="template_name"
        [processors.enum.mapping.value_mappings]
            ${DEVICE_ID_TO_TEMPLATE_NAME_MAP}

转发 不变

[[processors.starlark]]
    namepass=["${TEMPLATE_NAME}${DEVICE_TYPE}"]
    alias = "${TEMPLATE_NAME}@gateway@forward"
    order = ${order@300}
    source = '''
def apply(metric):
    if 'device_id' in metric.tags.keys() and 'template_name' in metric.tags.keys() and 'device_name' in metric.tags.keys() and 'tenant_id' in metric.tags.keys():
        metric.name = 'up/device/'+metric.tags["tenant_id"]+'/'+metric.tags["template_name"] + '/' + metric.tags["device_name"]
        device_id=metric.tags.get("device_id")
        metric.tags.clear()
        metric.tags.setdefault("device_id",device_id)
    return metric
'''

输出 新增 20230320之后弃用

[[outputs.mqtt]]
    flush_interval="1s"
    namepass=["up/callback/*"]
    alias = "output@mqtt@callback"
    servers = ["${EMQX_BROKER_OUT_SERVER}"]
    topic_prefix = ""
    client_id = "{\"userId\":${EMQX_BORKER_USER_ID},\"telegraf\":\"${TENANT_ID}@callbacksocket@output\"}"
    username = "${EMQX_BROKER_USERNAME}"
    password = "${EMQX_BROKER_PASSWORD}"
    batch = false
    data_format = "json"
    json_timestamp_units = "1ms"