评论

收藏

[NoSQL] spring boot使用IoTDB的两种方式

数据库 数据库 发布于:2021-12-17 23:11 | 阅读数:543 | 评论:0

InfluxDB和IotDB介绍与性能对比
Linux MacBook Docker安装IoTDB及使用
方式一: session方式访问IotDB (推荐使用,自带连接池)
maven依赖iotdb-session
<dependency>
     <groupId>org.apache.iotdb</groupId>
     <artifactId>iotdb-session</artifactId>
     <version>0.11.2</version>
</dependency>
springboot IotDB配置信息session方式
:
  iotdb:
  username: root
  password: root
  ip: 192.168.0.5
  port: 6667
  maxSize: 100
IotDB-session配置类
package com.beyond.data.config;
import org.apache.iotdb.session.pool.SessionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Configuration
public class IotDBSessionConfig {
  private static final Logger log = LoggerFactory.getLogger(IotDBSessionConfig.class);
  @Value("${spring.iotdb.username:root}")
  private String username;
  @Value("${spring.iotdb.password:root}")
  private String password;
  @Value("${spring.iotdb.ip:127.0.0.1}")
  private String ip;
  @Value("${spring.iotdb.port:6667}")
  private int port;
  @Value("${spring.iotdb.maxSize:10}")
  private int maxSize;
  private static SessionPool sessionPool;
  private SessionPool getSessionPool() {
    if (sessionPool == null) {
      sessionPool = new SessionPool(ip, port, username, password, maxSize);
    }
    return sessionPool;
  }
  public void insertRecord(String deviceId, long time, List<String> measurements, List<String> values) {
    getSessionPool();
    try {
      log.info("iotdb数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values);
      sessionPool.insertRecord(deviceId, time, measurements, values);
    } catch (Exception e) {
      log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}",
          deviceId, time, measurements, values, e.getMessage());
    }
  }
}
调用session方式
@Autowired
private IotDBSessionConfig iotDBSessionConfig;
......
StringBuffer tableName = new StringBuffer();
tableName.append("root").append(".").append("test").append("deviceid");
long currentTime = System.currentTimeMillis();
List<String> iotMeasurements = new ArrayList<>();
iotMeasurements.add("aaa");
iotMeasurements.add("bbb");
List<String> iotValues = new ArrayList<>();
iotValues.add("123");
iotValues.add("abide");
iotDBSessionConfig.insertRecord(tableName.toString(), currentTime, iotMeasurements, iotValues);
方式二: jdbc方式访问IotDB (自己实现连接池)
maven依赖iotdb-jdbc
<dependency>
   <groupId>org.apache.iotdb</groupId>
   <artifactId>iotdb-jdbc</artifactId>
   <version>0.11.2</version>
</dependency>
 <!-- alibaba的druid数据库连接池 -->
<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>druid-spring-boot-starter</artifactId>
</dependency>
spring boot IotDB配置信息jdbc
:
  iotdb:
  username: root
  password: root
  driver-name: org.apache.iotdb.jdbc.IoTDBDriver
  url: jdbc:iotdb://192.168.0.5:6667/
  initial-size: 5
  min-idle: 10
  max-active: 50
  max-wait: 60000
  remove-abandoned: true
  remove-abandoned-timeout: 30
  time-between-eviction-runs-millis: 60000
  min-evictable-idle-time-millis: 300000
  test-while-idle: false
  test-on-borrow: false
  test-on-return: false
IotDB-jdbc配置类
com.beyond.data.config;
import com.alibaba.druid.pool.DruidDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
@Configuration
public class IotDBConfig {
  private static final Logger log = LoggerFactory.getLogger(IotDBConfig.class);
  @Value("${spring.iotdb.username}")
  private String username;
  @Value("${spring.iotdb.password}")
  private String password;
  @Value("${spring.iotdb.driver-name}")
  private String driverName;
  @Value("${spring.iotdb.url}")
  private String url;
  @Value("${spring.iotdb.initial-size:20}")
  private int initialSize;
  @Value("${spring.iotdb.min-idle:10}")
  private int minIdle;
  @Value("${spring.iotdb.max-active:500}")
  private int maxActive;
  @Value("${spring.iotdb.max-wait:60000}")
  private int maxWait;
  @Value("${spring.iotdb.remove-abandoned:true}")
  private boolean removeAbandoned;
  @Value("${spring.iotdb.remove-abandoned-timeout:30}")
  private int removeAbandonedTimeout;
  @Value("${spring.iotdb.time-between-eviction-runs-millis:60000}")
  private int timeBetweenEvictionRunsMillis;
  @Value("${spring.iotdb.min-evictable-idle-time-millis:300000}")
  private int minEvictableIdleTimeMillis;
  @Value("${spring.iotdb.test-while-idle:false}")
  private boolean testWhileIdle;
  @Value("${spring.iotdb.test-on-borrow:false}")
  private boolean testOnBorrow;
  @Value("${spring.iotdb.test-on-return:false}")
  private boolean testOnReturn;
  private static DruidDataSource iotDbDataSource;
  //使用阿里的druid连接池
  private Connection getConnection() {
    if (iotDbDataSource == null) {
      iotDbDataSource = new DruidDataSource();
      //设置连接参数
      iotDbDataSource.setUrl(url);
      iotDbDataSource.setDriverClassName(driverName);
      iotDbDataSource.setUsername(username);
      iotDbDataSource.setPassword(password);
      //配置初始化大小、最小、最大
      iotDbDataSource.setInitialSize(initialSize);
      iotDbDataSource.setMinIdle(minIdle);
      iotDbDataSource.setMaxActive(maxActive);
      //配置获取连接等待超时的时间
      iotDbDataSource.setMaxWait(maxWait);
      //连接泄漏监测
      iotDbDataSource.setRemoveAbandoned(removeAbandoned);
      iotDbDataSource.setRemoveAbandonedTimeout(removeAbandonedTimeout);
      //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
      iotDbDataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
      iotDbDataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
      //防止过期
      iotDbDataSource.setTestWhileIdle(testWhileIdle);
      iotDbDataSource.setTestOnBorrow(testOnBorrow);
      iotDbDataSource.setTestOnReturn(testOnReturn);
    }
    Connection connection = null;
    try {
      connection = iotDbDataSource.getConnection();
    } catch (SQLException e) {
      e.printStackTrace();
      log.error("iotDB getConnection失败: error={}", e.getMessage());
    }
    return connection;
  }
  public void insert(String sql) {
    log.info("iotDB insert sql={}", sql);
    Connection connection = getConnection();
    Statement statement = null;
    try {
      if(connection!=null){
        statement = connection.createStatement();
        long systemTime = System.currentTimeMillis();
        statement.execute(sql);
        log.info("执行IotDb的sql----[{}],时间:[{}]ms", sql, System.currentTimeMillis()-systemTime);
      }
    } catch (SQLException e) {
      log.error("iotDB insert失败: error={}", e.getMessage());
    } finally {
      close(statement, connection);
    }
  }
  public List<Map<String, Object>> query(String sql) {
    Connection connection = getConnection();
    Statement statement = null;
    List<Map<String, Object>> resultList = null;
    ResultSet resultSet = null;
    try {
      if(connection!=null){
        statement = connection.createStatement();
        long systemTime = System.currentTimeMillis();
        resultSet = statement.executeQuery(sql);
        log.info("查询IotDb的sql----[{}],时间:[{}]ms", sql,System.currentTimeMillis()-systemTime);
        resultList = outputResult(resultSet);
      }
    } catch (SQLException e) {
      e.printStackTrace();
      log.error("iotDB query失败: error={}", e.getMessage());
    } finally {
      try {
        if (resultSet != null) {
          resultSet.close();
        }
      } catch (SQLException e) {
        log.error("iotDB resultSet关闭异常: error={}", e.getMessage());
      }
      close(statement, connection);
    }
    return resultList;
  }
  private List<Map<String, Object>> outputResult(ResultSet resultSet) throws SQLException {
    List<Map<String, Object>> resultList = new ArrayList<>();
    if (resultSet != null) {
      ResultSetMetaData metaData = resultSet.getMetaData();
      int columnCount = metaData.getColumnCount();
      while (resultSet.next()) {
        Map resultMap = new HashMap<>();
        for (int i = 1; i <= columnCount; i++) {
          String colunmName = metaData.getColumnLabel(i);
          if (colunmName.indexOf('.')>=0) {
            colunmName = colunmName.substring(colunmName.lastIndexOf('.') + 1);
          }
          if (colunmName.indexOf(')')>=0){//过滤 函数()括号
            colunmName = colunmName.substring(0, colunmName.lastIndexOf(')'));
          }
          if (colunmName.equals("Time")){//时序库自带的时间格式转换
            Long timeStamp = Long.parseLong(resultSet.getString(i));
            if(timeStamp > 0) {
              Date d = new Date(timeStamp);
              SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
              resultMap.put("Time", sf.format(d));
            }
          } else {
            resultMap.put(colunmName, resultSet.getString(i));
          }
        }
        resultList.add(resultMap);
      }
    }
    return resultList;
  }
  private void close(Statement statement, Connection connection) {
    try {
      if (statement != null) {
        statement.close();
      }
      if (connection != null) {
        connection.close();
      }
    } catch (Exception e) {
//      e.printStackTrace();
      log.error("iotDB close失败: error={}", e.getMessage());
    }
  }
}
调用jdbc方式
@Autowired
private IotDBConfig iotDBConfig;
......
StringBuffer tableName = new StringBuffer();
tableName.append("root").append(".").append("test").append("deviceid");
long currentTime = System.currentTimeMillis();
List<String> iotMeasurements = new ArrayList<>();
iotMeasurements.add("aaa");
iotMeasurements.add("bbb");
List<String> iotValues = new ArrayList<>();
iotValues.add("123");
iotValues.add("abde");
StringBuffer sql = new StringBuffer();
sql.append(" insert into ").append(tableName.toString());
sql.append("(timestamp,");
sql.append(String.join( ",", iotMeasurements)).append(")");
sql.append(" values(").append(currentTime).append(",");
sql.append(String.join(",", iotValues)).append(")");
iotDBConfig.insert(sql.toString());
//查询
StringBuffer querySql = new StringBuffer();
querySql.append(" select ").append("aaa");
querySql.append(" from ").append(tableName.toString());
querySql.append(" where ").append("bbb").append(" = '");
querySql.append("abde").append("'");
querySql.append(" order by time desc limit 1 ");
log.info("sql----{}",  querySql);
List<Map<String, Object>> resultList = iotDBConfig.query(querySql.toString());
</div>
    
    <div id="asideoffset"></div>

关注下面的标签,发现更多相似文章