Giter VIP home page Giter VIP logo

huaweicloud-amqp-adapter's Introduction

img

img

1. Intro

Huaweicloud-amqp-adapter是什么?

Huaweicloud-amqp-adapter是一个遵循Apache Qpid JMS及其SDK和AMQP协议开发的AMQP Consumer客户端工具,用于接入华为云IoTDA平台并接收平台传输的数据。

Huaweicloud-amqp-adapter有什么?

项目有两个主要分支——masteradapter-master:

  1. adapter-master 分支为AMQP Consumer部分。项目主分支,用于单独部署:
    • adpater——AMQP Consumer部分
    • web——将Consumer接收到的数据处理并入库;根据需求向外暴露HTTP接口供查询
  2. master 分支为毕设项目。项目副分支,[iot-system-v1.0.0] 包含以下功能:
    • adpater——AMQP Consumer部分
    • web——将Consumer接收到的数据处理并入库;响应前端查询请求,对数据库进行查询操作
    • security——用户登录注册部分,采用 Spring Security 开发

2. 目录结构

src/main
|── java
|   |── edu.hrbust.iot.amqp
|       |── HuaWeiCloudAmqpAdapterApplication.java  # SpringBoot 启动类
|       |── adapter                                 # AMQP Consumer部分
|       |   |── base                                ## 核心代码包
|       |   |   |── AmqpAdapter.java                ### 核心代码类
|       |   |   |── AmqpConfig.java                 ### 华为云物联网平台的配置信息
|       |   |   |── QpidConnectionListener.java     ### Connection的监听类
|       |   |── consumer                            ## 存放消费者类
|       |   |   |── QpidConsumer.java               ### Consumer模版类,可自主选择是否继承
|       |   |   |── HealthConsumer.java             ### 消费者样例
|       |   |   |── QpidJmsTemplate.java            ### AmqpAdapter的封装核心工具类
|       |   |── entity                              ## 存放自定义消息属性类
|       |       |── common                          ### 存放工具类
|       |       |   |── BaseMessageTemplate.java    #### 云平台消息JSON格式转换模版类,需继承
|       |       |   |── Properties.java             #### 云平台消息属性模板类,需继承
|       |       |   |── utils                       ### 存放云平台消息工具类
|       |       |       |── Body.java               #### 消息体,包含一个Services列表
|       |       |       |── Header.java             #### 消息头
|       |       |       |── NotifyData.java         #### 消息包,包含消息体和消息头
|       |       |       |── Services.java           #### 消息内容,包含Properties类及其子类
|       |       |── heart                           ### 存放自定义消息样例包
|       |           |── HealthAmqpMsg.java          #### 继承BaseMessageTemplate<HealthProperties>
|       |           |── HealthAmqpMsgConverter.java #### 将AMQP消息转换为Service可以处理的DTO
|       |           |── HealthProperties.java       #### 继承Properties并定义需要的属性
|       |── web                                     # Web后端部分
|            |── controller                         ## Controller控制层,响应HTTP请求
|            |── dao                                ## DAO持久化层,与DB交互
|            |── entity                             ## 实体包
|            |   |── heart                          ### 样例包,存放PO、DTO和VO
|            |── service                            ## Service逻辑层,定义接口
|            |   |── impl                           ### 实现类包,实现在Service层定义的接口
|            |── utils                              ## 工具包
|                 |── common                        ### 基础工具类
|                 |── converter                     ### 三层架构中各层间数据转换工具
|── resources                                     
    |── application.properties                      # 配置文件

3. 开发流程

按照项目目录结构,开发需分为adapterweb两部分进行

  • adapter
    1. 填写云平台配置信息
    2. 创建用于接收消息的实体类和属性转换工具类
    3. 编写AMQP Consumer与云平台建立连接并接收数据,调用web入库操作
  • web
    1. 编写与adapter部分消息实体类属性对应的POJO(VO、DTO和PO)
    2. 编写Vo<->DTODTO<->PO两个converter
    3. 编写Controller、Service和DAO层的实现代码

3.1 adapter

3.1.1 配置信息

华为云IoTDA平台上完成设备字段映射和接入配置之后,将配置信息填入application.properties配置文件

huawei.qpid.accessKey=
huawei.qpid.accessCode=
huawei.qpid.broker-url=amqps://${huawei.qpid.UUCID}.iot-amqps.cn-north-4.myhuaweicloud.com:5671?amqp.vhost=default&amqp.idleTimeout=8000&amqp.saslMechanisms=PLAIN
huawei.qpid.UUCID=
huawei.qpid.queueName=

3.1.2 创建实体类

华为云物联网平台会将设备上报的数据流按照配置好的字段映射封装成JSON格式传输,云平台消息样例如下:

{
  "resource": "device.property",
  "event": "report",
  "event_time": "20210521T122548Z",
  "notify_data": {
    "header": {
      "app_id": "e6c196d40df84846bacbd9573dabb685",
      "device_id": "5fd6df9937f2a30303b55693_863434047705329",
      "node_id": "863434047705329",
      "product_id": "5fd6df9937f2a30303b55693",
      "gateway_id": "5fd6df9937f2a30303b55693_863434047705329"
    },
    "body": {
      "services": [{
        "service_id": "HeartRateDeviceService",
        "properties": {
          "properties":"properties",
          ...
        },
        "event_time": "20210521T122548Z"
      }]
    }
  }
}

将JSON格式转换成实体类的方案有很多种,流行的有Alibaba fastjsonFastXML jackson,两者在分析JSON数据时都需要指定一个目标实体类,才能将JSON数据按照字段名一一复制给对应的实体属性。

鉴于华为云IoTDA平台规定的JSON数据中只有properties字段是需要自定义的,因此只需规范此字段。

adapter.entity包下新建一个实体包并创建以下三个类(e.g. health包)

  1. 属性类 extends Properties
  2. AMQP 消息接收类 extends BaseMessageTemplate<P extends Properties>
  3. 消息转换工具类

开发自定义实体类时,只需继承adapter.entity.common包中已将冗余信息解耦的BaseMessageTemplate类并定义相应的Properties类传入即可。

// 属性类
public class HealthProperties extends Properties {
    // properties
    ...
}

// AMQP 消息接收类
public class HealthAmqpMsg extends BaseMessageTemplate<HealthProperties> {}

// 消息转换工具类
public class HealthAmqpMsgConverter {
    public HealthDataDTO convertToDTO(HealthAmqpMsg healthData){
        HealthDataDTO dto = new HealthDataDTO();
        // build header
        Header header = healthData.getNotifyData().getHeader();
        dto.setAppId(header.getAppId());
        dto.setDeviceId(header.getDeviceId());
        dto.setNodeId(header.getNodeId());
        dto.setProductId(header.getProductId());
        dto.setGatewayId(header.getGatewayId());

        // build body
        Body<HealthProperties> body = healthData.getNotifyData().getBody();
        Services<HealthProperties> services =  body.getServices().get(0);
        dto.setServiceId(services.getServiceId());
        HealthProperties healthProperties = services.getProperties();

        // build properties
        
        return dto;
    }
}

3.1.3 创建消费者

adapter.consumer包中已定义了一个消费者抽象模板类QpidConsumer,并且自动注入了工具类QpidJmsTemplate。可以自主选择继承QpidConsumer类或直接使用QpidJmsTemplate工具类进行消息接收。

// 开发消费者
public class HealthConsumer extends QpidConsumer {

    @Autowired
    private HealthService heartHealthService;

    @Autowired
    private HealthAmqpMsgConverter heartConverter;

    private void save(HealthAmqpMsg healthData){
        HealthDataDTO healthDataDTO = heartConverter.convertToDTO(healthData);
        heartHealthService.save(healthDataDTO);
    }

    @Scheduled(fixedDelay = 300000)
    public void receiveHealthData() {
        log.info("[定时任务开始], {}", LocalDateTime.now());
        List<HealthAmqpMsg> healthData = qpidJmsTemplate.receiveAndConvertToJson(HealthAmqpMsg.class);
        log.info("共接收到 [{}] 条消息", healthData.size());

        if (!healthData.isEmpty()) {
            log.info("[准备入库], {}, 入库信息为: {}",LocalDateTime.now(), healthData);
            healthData.forEach(this::save);
            log.info("[已入库,定时任务结束], {} ", LocalDateTime.now());
        }
    }
}

3.2 web

3.2.1 编写POJO

「PO」Persistant Object 持久层对象
@Data   //lombok注释,用于自动生成getter和setter
@Entity //javax.persistence注释,用于注释PO表明其为数据库实体类
// 类名需和表名对应,此类对应health_data表
public class HealthData extends BasePO { 
    // header
    private String appId;
    private String deviceId;
    private String nodeId;
    private String productId;
    private String gatewayId;

    // body.services
    private String serviceId;
    private Date eventTime;
    
    // properties
    ...
}
「DTO」Data Transfer Object 数据传输对象
@Data
@AllArgsConstructor
@NoArgsConstructor
// 用于在DAO和AMQP Consumer与Service层间传递消息
public class HealthDataDTO extends BaseDTO { 
    // header
    private String appId;
    private String deviceId;
    private String nodeId;
    private String productId;
    private String gatewayId;

    // body.services
    private String serviceId;
    private Date eventTime;
  
    // properties
    ...
}
「VO」Value Object 值对象
@Data
@AllArgsConstructor
@NoArgsConstructor
// 响应HTTP请求时返回的数据
public class HealthDataVO implements Serializable { 
    // 前端需要展示的数据字段
    ...
}

3.2.2 编写转换工具类

VO<->DTODTO<->PO间复制数据需要两个转换工具。

本项目通过 java.reflect 提供的反射工具和Spring提供的 BeanUtils 工具进行类型转换时的属性复制,并封装在 BaseConverter 接口中。

编写工具类的时候只需实现上述接口并指定 源类SOURCE目标类TARGET 即可:

HealthDVConverter
//Vo<->DTO
public class HealthDVConverter 
  implements BaseConverter<HealthDataDTO, HealthDataVO> {}
HealthPDConverter
//DTO<->PO
public class HealthPDConverter 
  implements BaseConverter<HealthData, HealthDataDTO> {}

3.2.3 编写三层架构

Controller
@CrossOrigin
@RestController
@RequestMapping("/health")
public class HealthController {

    @Autowired
    private HealthService healthService;

    @Autowired
    private HealthDVConverter converter;
		
  	// 监听/health/queryPage的HTTP请求
    // Page为分页信息类,WebResponse为统一返回类
    @RequestMapping(value = "/queryPage", method = RequestMethod.POST)
    public WebResponse<Page<HealthDataVO>> queryPage(@RequestBody(required = false) AmqpQuery amqpQuery){
        PageDTO<HealthDataDTO> pageDTO = healthService.queryPage(amqpQuery);
        return WebResponse.success(Page.from(pageDTO, dto ->converter.toTarget(dto)));
    }
}
Service
// 面向接口开发,定义规范
package edu.hrbust.iot.amqp.web.service
public interface HealthService {

    PageDTO<HealthDataDTO> queryPage(AmqpQuery amqpQuery);

    void save(HealthDataDTO healthDataDTO);

    List<HealthDataDTO> queryAll();
}

// 接口实现类
package edu.hrbust.iot.amqp.web.service.impl
public class DefaultHealthService implements HealthService {
    
    @Autowired
    private HealthDataRepository repository;

    @Autowired
    private HealthPDConverter converter;
  
    //编写具体的实现代码
    @Override
    public PageDTO<HealthDataDTO> queryPage(AmqpQuery amqpQuery){}

  	@Override
    public void save(HealthDataDTO healthDataDTO){}
  
    @Override
    public List<HealthDataDTO> queryAll(){}
  
}
Dao
// Dao层采用Spring Data JPA实现,只需继承JpaRepository即可实现简单的数据库查找功能
// JpaSpecificationExecutor接口提供分页功能
public interface HealthDataRepository extends
        JpaRepository<HealthData, Long>, JpaSpecificationExecutor<HealthData> {}

3.3 数据库

3.3.1 实现

create table health_data
(
    id              int auto_increment, 
    uid             varchar(255) not null,
    service_id      varchar(255) null,
    max_heart_rate  int          not null,
    min_heart_rate  int          not null,
    aver_heart_rate int          not null,
    start_time      varchar(255) not null,
    end_time        varchar(255) not null,
    start_date      varchar(255) null,
    end_date        varchar(255) null,
    event_time      datetime     null,
    created_time    datetime     null,
    app_id          varchar(255) null,
    node_id         varchar(255) null,
    product_id      varchar(255) null,
    gateway_id      varchar(255) null,
    device_id       varchar(255) null,
    primary key(id),
);

huaweicloud-amqp-adapter's People

Contributors

wwwwwwwxqxq avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.