mirror of
				https://github.com/YunaiV/ruoyi-vue-pro.git
				synced 2025-11-04 08:06:12 +08:00 
			
		
		
		
	feat: 【IoT 物联网】数据流转目的 TCP 执行器
This commit is contained in:
		@ -17,6 +17,8 @@ import lombok.Data;
 | 
			
		||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true)
 | 
			
		||||
@JsonSubTypes({
 | 
			
		||||
        @JsonSubTypes.Type(value = IotDataSinkHttpConfig.class, name = "1"),
 | 
			
		||||
        @JsonSubTypes.Type(value = IotDataSinkTcpConfig.class, name = "2"),
 | 
			
		||||
        @JsonSubTypes.Type(value = IotDataSinkWebSocketConfig.class, name = "3"),
 | 
			
		||||
        @JsonSubTypes.Type(value = IotDataSinkMqttConfig.class, name = "10"),
 | 
			
		||||
        @JsonSubTypes.Type(value = IotDataSinkRedisConfig.class, name = "21"),
 | 
			
		||||
        @JsonSubTypes.Type(value = IotDataSinkRocketMQConfig.class, name = "30"),
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,63 @@
 | 
			
		||||
package cn.iocoder.yudao.module.iot.dal.dataobject.rule.config;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * IoT TCP 配置 {@link IotAbstractDataSinkConfig} 实现类
 | 
			
		||||
 *
 | 
			
		||||
 * @author HUIHUI
 | 
			
		||||
 */
 | 
			
		||||
@Data
 | 
			
		||||
public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig {
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * TCP 服务器地址
 | 
			
		||||
     */
 | 
			
		||||
    private String host;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * TCP 服务器端口
 | 
			
		||||
     */
 | 
			
		||||
    private Integer port;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 连接超时时间(毫秒)
 | 
			
		||||
     */
 | 
			
		||||
    private Integer connectTimeoutMs = 5000;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 读取超时时间(毫秒)
 | 
			
		||||
     */
 | 
			
		||||
    private Integer readTimeoutMs = 10000;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 是否启用 SSL
 | 
			
		||||
     */
 | 
			
		||||
    private Boolean ssl = false;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * SSL 证书路径(当 ssl=true 时需要)
 | 
			
		||||
     */
 | 
			
		||||
    private String sslCertPath;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 数据格式:JSON 或 BINARY
 | 
			
		||||
     */
 | 
			
		||||
    private String dataFormat = "JSON";
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 心跳间隔时间(毫秒),0 表示不启用心跳
 | 
			
		||||
     */
 | 
			
		||||
    private Long heartbeatIntervalMs = 30000L;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 重连间隔时间(毫秒)
 | 
			
		||||
     */
 | 
			
		||||
    private Long reconnectIntervalMs = 5000L;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 最大重连次数
 | 
			
		||||
     */
 | 
			
		||||
    private Integer maxReconnectAttempts = 3;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,87 @@
 | 
			
		||||
package cn.iocoder.yudao.module.iot.dal.dataobject.rule.config;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * IoT WebSocket 配置 {@link IotAbstractDataSinkConfig} 实现类
 | 
			
		||||
 * <p>
 | 
			
		||||
 * 配置设备消息通过 WebSocket 协议发送到外部 WebSocket 服务器
 | 
			
		||||
 * 支持 WebSocket (ws://) 和 WebSocket Secure (wss://) 连接
 | 
			
		||||
 *
 | 
			
		||||
 * @author HUIHUI
 | 
			
		||||
 */
 | 
			
		||||
@Data
 | 
			
		||||
public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig {
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * WebSocket 服务器地址
 | 
			
		||||
     * 例如:ws://localhost:8080/ws 或 wss://example.com/ws
 | 
			
		||||
     */
 | 
			
		||||
    private String serverUrl;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 连接超时时间(毫秒)
 | 
			
		||||
     */
 | 
			
		||||
    private Integer connectTimeoutMs = 5000;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 发送超时时间(毫秒)
 | 
			
		||||
     */
 | 
			
		||||
    private Integer sendTimeoutMs = 10000;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 心跳间隔时间(毫秒),0 表示不启用心跳
 | 
			
		||||
     */
 | 
			
		||||
    private Long heartbeatIntervalMs = 30000L;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 心跳消息内容(JSON 格式)
 | 
			
		||||
     */
 | 
			
		||||
    private String heartbeatMessage = "{\"type\":\"heartbeat\"}";
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 子协议列表(逗号分隔)
 | 
			
		||||
     */
 | 
			
		||||
    private String subprotocols;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 自定义请求头(JSON 格式)
 | 
			
		||||
     */
 | 
			
		||||
    private String customHeaders;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 是否启用 SSL 证书验证(仅对 wss:// 生效)
 | 
			
		||||
     */
 | 
			
		||||
    private Boolean verifySslCert = true;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 数据格式:JSON 或 TEXT
 | 
			
		||||
     */
 | 
			
		||||
    private String dataFormat = "JSON";
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 重连间隔时间(毫秒)
 | 
			
		||||
     */
 | 
			
		||||
    private Long reconnectIntervalMs = 5000L;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 最大重连次数
 | 
			
		||||
     */
 | 
			
		||||
    private Integer maxReconnectAttempts = 3;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 是否启用压缩
 | 
			
		||||
     */
 | 
			
		||||
    private Boolean enableCompression = false;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 消息发送重试次数
 | 
			
		||||
     */
 | 
			
		||||
    private Integer sendRetryCount = 1;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 消息发送重试间隔(毫秒)
 | 
			
		||||
     */
 | 
			
		||||
    private Long sendRetryIntervalMs = 1000L;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,97 @@
 | 
			
		||||
package cn.iocoder.yudao.module.iot.service.rule.data.action;
 | 
			
		||||
 | 
			
		||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
 | 
			
		||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkTcpConfig;
 | 
			
		||||
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
 | 
			
		||||
import cn.iocoder.yudao.module.iot.service.rule.data.action.tcp.IotTcpClient;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
 | 
			
		||||
import java.time.Duration;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * TCP 的 {@link IotDataRuleAction} 实现类
 | 
			
		||||
 * <p>
 | 
			
		||||
 * 负责将设备消息发送到外部 TCP 服务器
 | 
			
		||||
 * 支持普通 TCP 和 SSL TCP 连接,支持 JSON 和 BINARY 数据格式
 | 
			
		||||
 * 使用连接池管理 TCP 连接,提高性能和资源利用率
 | 
			
		||||
 *
 | 
			
		||||
 * @author HUIHUI
 | 
			
		||||
 */
 | 
			
		||||
@Component
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class IotTcpDataRuleAction extends
 | 
			
		||||
        IotDataRuleCacheableAction<IotDataSinkTcpConfig, IotTcpClient> {
 | 
			
		||||
 | 
			
		||||
    private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(5);
 | 
			
		||||
    private static final Duration SEND_TIMEOUT = Duration.ofSeconds(10);
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public Integer getType() {
 | 
			
		||||
        return IotDataSinkTypeEnum.TCP.getType();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected IotTcpClient initProducer(IotDataSinkTcpConfig config) throws Exception {
 | 
			
		||||
        // 1. 参数校验
 | 
			
		||||
        if (config.getHost() == null || config.getHost().trim().isEmpty()) {
 | 
			
		||||
            throw new IllegalArgumentException("TCP 服务器地址不能为空");
 | 
			
		||||
        }
 | 
			
		||||
        if (config.getPort() == null || config.getPort() <= 0 || config.getPort() > 65535) {
 | 
			
		||||
            throw new IllegalArgumentException("TCP 服务器端口无效");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // 2. 创建 TCP 客户端
 | 
			
		||||
        IotTcpClient tcpClient = new IotTcpClient(
 | 
			
		||||
                config.getHost(),
 | 
			
		||||
                config.getPort(),
 | 
			
		||||
                config.getConnectTimeoutMs(),
 | 
			
		||||
                config.getReadTimeoutMs(),
 | 
			
		||||
                config.getSsl(),
 | 
			
		||||
                config.getSslCertPath(),
 | 
			
		||||
                config.getDataFormat()
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        // 3. 连接服务器
 | 
			
		||||
        tcpClient.connect();
 | 
			
		||||
 | 
			
		||||
        log.info("[initProducer][TCP 客户端创建并连接成功,服务器: {}:{},SSL: {},数据格式: {}]",
 | 
			
		||||
                config.getHost(), config.getPort(), config.getSsl(), config.getDataFormat());
 | 
			
		||||
 | 
			
		||||
        return tcpClient;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected void closeProducer(IotTcpClient producer) throws Exception {
 | 
			
		||||
        if (producer != null) {
 | 
			
		||||
            producer.close();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected void execute(IotDeviceMessage message, IotDataSinkTcpConfig config) throws Exception {
 | 
			
		||||
        try {
 | 
			
		||||
            // 1. 获取或创建 TCP 客户端
 | 
			
		||||
            IotTcpClient tcpClient = getProducer(config);
 | 
			
		||||
 | 
			
		||||
            // 2. 检查连接状态,如果断开则重新连接
 | 
			
		||||
            if (!tcpClient.isConnected()) {
 | 
			
		||||
                log.warn("[execute][TCP 连接已断开,尝试重新连接,服务器: {}:{}]", config.getHost(), config.getPort());
 | 
			
		||||
                tcpClient.connect();
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // 3. 发送消息并等待结果
 | 
			
		||||
            tcpClient.sendMessage(message);
 | 
			
		||||
 | 
			
		||||
            // 4. 记录发送成功日志
 | 
			
		||||
            log.info("[execute][message({}) config({}) 发送成功,TCP 服务器: {}:{}]",
 | 
			
		||||
                    message, config, config.getHost(), config.getPort());
 | 
			
		||||
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("[execute][message({}) config({}) 发送失败,TCP 服务器: {}:{}]",
 | 
			
		||||
                    message, config, config.getHost(), config.getPort(), e);
 | 
			
		||||
            throw e;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,184 @@
 | 
			
		||||
package cn.iocoder.yudao.module.iot.service.rule.data.action.tcp;
 | 
			
		||||
 | 
			
		||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
 | 
			
		||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
 | 
			
		||||
import javax.net.ssl.SSLSocketFactory;
 | 
			
		||||
import java.io.BufferedReader;
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
import java.io.InputStreamReader;
 | 
			
		||||
import java.io.OutputStream;
 | 
			
		||||
import java.net.InetSocketAddress;
 | 
			
		||||
import java.net.Socket;
 | 
			
		||||
import java.nio.charset.StandardCharsets;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicBoolean;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * IoT TCP 客户端
 | 
			
		||||
 * <p>
 | 
			
		||||
 * 负责与外部 TCP 服务器建立连接并发送设备消息
 | 
			
		||||
 * 支持 JSON 和 BINARY 两种数据格式,支持 SSL 加密连接
 | 
			
		||||
 *
 | 
			
		||||
 * @author HUIHUI
 | 
			
		||||
 */
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class IotTcpClient {
 | 
			
		||||
 | 
			
		||||
    private final String host;
 | 
			
		||||
    private final Integer port;
 | 
			
		||||
    private final Integer connectTimeoutMs;
 | 
			
		||||
    private final Integer readTimeoutMs;
 | 
			
		||||
    private final Boolean ssl;
 | 
			
		||||
    private final String sslCertPath;
 | 
			
		||||
    private final String dataFormat;
 | 
			
		||||
 | 
			
		||||
    private Socket socket;
 | 
			
		||||
    private OutputStream outputStream;
 | 
			
		||||
    private BufferedReader reader;
 | 
			
		||||
    private final AtomicBoolean connected = new AtomicBoolean(false);
 | 
			
		||||
 | 
			
		||||
    public IotTcpClient(String host, Integer port, Integer connectTimeoutMs, Integer readTimeoutMs,
 | 
			
		||||
                        Boolean ssl, String sslCertPath, String dataFormat) {
 | 
			
		||||
        this.host = host;
 | 
			
		||||
        this.port = port;
 | 
			
		||||
        this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : 5000;
 | 
			
		||||
        this.readTimeoutMs = readTimeoutMs != null ? readTimeoutMs : 10000;
 | 
			
		||||
        this.ssl = ssl != null ? ssl : false;
 | 
			
		||||
        this.sslCertPath = sslCertPath;
 | 
			
		||||
        this.dataFormat = dataFormat != null ? dataFormat : "JSON";
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 连接到 TCP 服务器
 | 
			
		||||
     */
 | 
			
		||||
    public void connect() throws Exception {
 | 
			
		||||
        if (connected.get()) {
 | 
			
		||||
            log.warn("[connect][TCP 客户端已经连接,无需重复连接]");
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            if (ssl) {
 | 
			
		||||
                // SSL 连接
 | 
			
		||||
                SSLSocketFactory sslSocketFactory = (SSLSocketFactory) SSLSocketFactory.getDefault();
 | 
			
		||||
                socket = sslSocketFactory.createSocket();
 | 
			
		||||
            } else {
 | 
			
		||||
                // 普通连接
 | 
			
		||||
                socket = new Socket();
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // 连接服务器
 | 
			
		||||
            socket.connect(new InetSocketAddress(host, port), connectTimeoutMs);
 | 
			
		||||
            socket.setSoTimeout(readTimeoutMs);
 | 
			
		||||
 | 
			
		||||
            // 获取输入输出流
 | 
			
		||||
            outputStream = socket.getOutputStream();
 | 
			
		||||
            reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
 | 
			
		||||
 | 
			
		||||
            connected.set(true);
 | 
			
		||||
            log.info("[connect][TCP 客户端连接成功,服务器地址: {}:{}]", host, port);
 | 
			
		||||
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            close();
 | 
			
		||||
            log.error("[connect][TCP 客户端连接失败,服务器地址: {}:{}]", host, port, e);
 | 
			
		||||
            throw e;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 发送设备消息
 | 
			
		||||
     *
 | 
			
		||||
     * @param message 设备消息
 | 
			
		||||
     * @throws Exception 发送异常
 | 
			
		||||
     */
 | 
			
		||||
    public void sendMessage(IotDeviceMessage message) throws Exception {
 | 
			
		||||
        if (!connected.get()) {
 | 
			
		||||
            throw new IllegalStateException("TCP 客户端未连接");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            String messageData;
 | 
			
		||||
            if ("JSON".equalsIgnoreCase(dataFormat)) {
 | 
			
		||||
                // JSON 格式
 | 
			
		||||
                messageData = JsonUtils.toJsonString(message);
 | 
			
		||||
            } else {
 | 
			
		||||
                // BINARY 格式(这里简化为字符串,实际可能需要自定义二进制协议)
 | 
			
		||||
                messageData = message.toString();
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // 发送消息
 | 
			
		||||
            outputStream.write(messageData.getBytes(StandardCharsets.UTF_8));
 | 
			
		||||
            outputStream.write('\n'); // 添加换行符作为消息分隔符
 | 
			
		||||
            outputStream.flush();
 | 
			
		||||
 | 
			
		||||
            log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]",
 | 
			
		||||
                    message.getDeviceId(), messageData.length());
 | 
			
		||||
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("[sendMessage][发送消息失败,设备 ID: {}]", message.getDeviceId(), e);
 | 
			
		||||
            throw e;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 关闭连接
 | 
			
		||||
     */
 | 
			
		||||
    public void close() {
 | 
			
		||||
        if (!connected.get()) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            // 关闭资源
 | 
			
		||||
            if (reader != null) {
 | 
			
		||||
                try {
 | 
			
		||||
                    reader.close();
 | 
			
		||||
                } catch (IOException e) {
 | 
			
		||||
                    log.warn("[close][关闭输入流失败]", e);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            if (outputStream != null) {
 | 
			
		||||
                try {
 | 
			
		||||
                    outputStream.close();
 | 
			
		||||
                } catch (IOException e) {
 | 
			
		||||
                    log.warn("[close][关闭输出流失败]", e);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            if (socket != null) {
 | 
			
		||||
                try {
 | 
			
		||||
                    socket.close();
 | 
			
		||||
                } catch (IOException e) {
 | 
			
		||||
                    log.warn("[close][关闭 Socket 失败]", e);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            connected.set(false);
 | 
			
		||||
            log.info("[close][TCP 客户端连接已关闭,服务器地址: {}:{}]", host, port);
 | 
			
		||||
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("[close][关闭 TCP 客户端连接异常]", e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 检查连接状态
 | 
			
		||||
     *
 | 
			
		||||
     * @return 是否已连接
 | 
			
		||||
     */
 | 
			
		||||
    public boolean isConnected() {
 | 
			
		||||
        return connected.get() && socket != null && !socket.isClosed();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public String toString() {
 | 
			
		||||
        return "IotTcpClient{" +
 | 
			
		||||
                "host='" + host + '\'' +
 | 
			
		||||
                ", port=" + port +
 | 
			
		||||
                ", ssl=" + ssl +
 | 
			
		||||
                ", dataFormat='" + dataFormat + '\'' +
 | 
			
		||||
                ", connected=" + connected.get() +
 | 
			
		||||
                '}';
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,161 @@
 | 
			
		||||
package cn.iocoder.yudao.module.iot.service.rule.data.action;
 | 
			
		||||
 | 
			
		||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
 | 
			
		||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
 | 
			
		||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkTcpConfig;
 | 
			
		||||
import cn.iocoder.yudao.module.iot.service.rule.data.action.tcp.IotTcpClient;
 | 
			
		||||
import org.junit.jupiter.api.BeforeEach;
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
import org.mockito.Mock;
 | 
			
		||||
import org.mockito.MockitoAnnotations;
 | 
			
		||||
 | 
			
		||||
import static org.junit.jupiter.api.Assertions.*;
 | 
			
		||||
import static org.mockito.Mockito.*;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * {@link IotTcpDataRuleAction} 的单元测试
 | 
			
		||||
 *
 | 
			
		||||
 * @author HUIHUI
 | 
			
		||||
 */
 | 
			
		||||
class IotTcpDataRuleActionTest {
 | 
			
		||||
 | 
			
		||||
    private IotTcpDataRuleAction tcpDataRuleAction;
 | 
			
		||||
 | 
			
		||||
    @Mock
 | 
			
		||||
    private IotTcpClient mockTcpClient;
 | 
			
		||||
 | 
			
		||||
    @BeforeEach
 | 
			
		||||
    void setUp() {
 | 
			
		||||
        MockitoAnnotations.openMocks(this);
 | 
			
		||||
        tcpDataRuleAction = new IotTcpDataRuleAction();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void testGetType() {
 | 
			
		||||
        // 准备参数
 | 
			
		||||
        Integer expectedType = 2; // 数据接收类型枚举中 TCP 类型的值
 | 
			
		||||
 | 
			
		||||
        // 调用方法
 | 
			
		||||
        Integer actualType = tcpDataRuleAction.getType();
 | 
			
		||||
 | 
			
		||||
        // 断言结果
 | 
			
		||||
        assertEquals(expectedType, actualType);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void testInitProducer_Success() throws Exception {
 | 
			
		||||
        // 准备参数
 | 
			
		||||
        IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
 | 
			
		||||
        config.setHost("localhost");
 | 
			
		||||
        config.setPort(8080);
 | 
			
		||||
        config.setDataFormat("JSON");
 | 
			
		||||
        config.setSsl(false);
 | 
			
		||||
 | 
			
		||||
        // 调用方法 & 断言结果
 | 
			
		||||
        // 此测试需要实际的 TCP 连接,在单元测试中可能不可用
 | 
			
		||||
        // 目前我们只验证配置是否有效
 | 
			
		||||
        assertNotNull(config.getHost());
 | 
			
		||||
        assertTrue(config.getPort() > 0 && config.getPort() <= 65535);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void testInitProducer_InvalidHost() {
 | 
			
		||||
        // 准备参数
 | 
			
		||||
        IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
 | 
			
		||||
        config.setHost("");
 | 
			
		||||
        config.setPort(8080);
 | 
			
		||||
 | 
			
		||||
        // 调用方法 & 断言结果
 | 
			
		||||
        IotTcpDataRuleAction action = new IotTcpDataRuleAction();
 | 
			
		||||
 | 
			
		||||
        // 测试验证逻辑(通常在 initProducer 方法中)
 | 
			
		||||
        assertThrows(IllegalArgumentException.class, () -> {
 | 
			
		||||
            if (config.getHost() == null || config.getHost().trim().isEmpty()) {
 | 
			
		||||
                throw new IllegalArgumentException("TCP 服务器地址不能为空");
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void testInitProducer_InvalidPort() {
 | 
			
		||||
        // 准备参数
 | 
			
		||||
        IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
 | 
			
		||||
        config.setHost("localhost");
 | 
			
		||||
        config.setPort(-1);
 | 
			
		||||
 | 
			
		||||
        // 调用方法 & 断言结果
 | 
			
		||||
        assertThrows(IllegalArgumentException.class, () -> {
 | 
			
		||||
            if (config.getPort() == null || config.getPort() <= 0 || config.getPort() > 65535) {
 | 
			
		||||
                throw new IllegalArgumentException("TCP 服务器端口无效");
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void testCloseProducer() throws Exception {
 | 
			
		||||
        // 准备参数
 | 
			
		||||
        IotTcpClient client = mock(IotTcpClient.class);
 | 
			
		||||
 | 
			
		||||
        // 调用方法
 | 
			
		||||
        tcpDataRuleAction.closeProducer(client);
 | 
			
		||||
 | 
			
		||||
        // 断言结果
 | 
			
		||||
        verify(client, times(1)).close();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void testExecute_WithValidConfig() {
 | 
			
		||||
        // 准备参数
 | 
			
		||||
        IotDeviceMessage message = IotDeviceMessage.requestOf("thing.property.report",
 | 
			
		||||
                "{\"temperature\": 25.5, \"humidity\": 60}");
 | 
			
		||||
 | 
			
		||||
        IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
 | 
			
		||||
        config.setHost("localhost");
 | 
			
		||||
        config.setPort(8080);
 | 
			
		||||
        config.setDataFormat("JSON");
 | 
			
		||||
 | 
			
		||||
        // 调用方法 & 断言结果
 | 
			
		||||
        // 通常这需要实际的 TCP 连接
 | 
			
		||||
        // 在单元测试中,我们只验证输入参数
 | 
			
		||||
        assertNotNull(message);
 | 
			
		||||
        assertNotNull(config);
 | 
			
		||||
        assertNotNull(config.getHost());
 | 
			
		||||
        assertTrue(config.getPort() > 0);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void testConfig_DefaultValues() {
 | 
			
		||||
        // 准备参数
 | 
			
		||||
        IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
 | 
			
		||||
 | 
			
		||||
        // 调用方法 & 断言结果
 | 
			
		||||
        // 验证默认值
 | 
			
		||||
        assertEquals("JSON", config.getDataFormat());
 | 
			
		||||
        assertEquals(5000, config.getConnectTimeoutMs());
 | 
			
		||||
        assertEquals(10000, config.getReadTimeoutMs());
 | 
			
		||||
        assertEquals(false, config.getSsl());
 | 
			
		||||
        assertEquals(30000L, config.getHeartbeatIntervalMs());
 | 
			
		||||
        assertEquals(5000L, config.getReconnectIntervalMs());
 | 
			
		||||
        assertEquals(3, config.getMaxReconnectAttempts());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void testMessageSerialization() {
 | 
			
		||||
        // 准备参数
 | 
			
		||||
        IotDeviceMessage message = IotDeviceMessage.builder()
 | 
			
		||||
                .deviceId(123L)
 | 
			
		||||
                .method("thing.property.report")
 | 
			
		||||
                .params("{\"temperature\": 25.5}")
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        // 调用方法
 | 
			
		||||
        String json = JsonUtils.toJsonString(message);
 | 
			
		||||
 | 
			
		||||
        // 断言结果
 | 
			
		||||
        assertNotNull(json);
 | 
			
		||||
        assertTrue(json.contains("\"deviceId\":123"));
 | 
			
		||||
        assertTrue(json.contains("\"method\":\"thing.property.report\""));
 | 
			
		||||
        assertTrue(json.contains("\"temperature\":25.5"));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user