This commit is contained in:
SwallowGG
2023-10-12 22:22:57 +08:00
parent 23e4dcfac8
commit 2fd6fcfbf9
2 changed files with 89 additions and 39 deletions

View File

@ -22,25 +22,25 @@ public class CacheKey {
}
public static String getTableKey(Long dataSourceId, String databaseName, String schemaName) {
StringBuffer stringBuffer = new StringBuffer("tables_dataSourceId" + dataSourceId);
StringBuffer stringBuffer = new StringBuffer("tables_dataSourceId_" + dataSourceId);
if (!StringUtils.isEmpty(databaseName)) {
stringBuffer.append("_databaseName" + databaseName);
stringBuffer.append("_databaseName_" + databaseName);
}
if (!StringUtils.isEmpty(schemaName)) {
stringBuffer.append("_schemaName" + schemaName);
stringBuffer.append("_schemaName_" + schemaName);
}
return stringBuffer.toString();
}
public static String getColumnKey(Long dataSourceId, String databaseName, String schemaName,String tableName) {
StringBuffer stringBuffer = new StringBuffer("columns_dataSourceId" + dataSourceId);
StringBuffer stringBuffer = new StringBuffer("columns_dataSourceId_" + dataSourceId);
if (!StringUtils.isEmpty(databaseName)) {
stringBuffer.append("_databaseName" + databaseName);
stringBuffer.append("_databaseName_" + databaseName);
}
if (!StringUtils.isEmpty(schemaName)) {
stringBuffer.append("_schemaName" + schemaName);
stringBuffer.append("_schemaName_" + schemaName);
}
stringBuffer.append("_tableName"+tableName);
stringBuffer.append("_tableName_"+tableName);
return stringBuffer.toString();
}
}

View File

@ -136,13 +136,45 @@ public class TableServiceImpl implements TableService {
@Override
public PageResult<Table> pageQuery(TablePageQueryParam param, TableSelector selector) {
LambdaQueryWrapper<TableCacheVersionDO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(TableCacheVersionDO::getKey, getTableKey(param.getDataSourceId(), param.getDatabaseName(), param.getSchemaName()));
String key = getTableKey(param.getDataSourceId(), param.getDatabaseName(), param.getSchemaName());
queryWrapper.eq(TableCacheVersionDO::getKey, key);
TableCacheVersionDO versionDO = tableCacheVersionMapper.selectOne(queryWrapper);
long total = 0;
long version = 0L;
if (param.isRefresh() || versionDO == null) {
versionDO = addDBCache(param.getDataSourceId(),param.getDatabaseName(),param.getSchemaName(), versionDO);
version = getLock(param.getDataSourceId(), param.getDatabaseName(), param.getSchemaName(), versionDO);
if (version == -1) {
int n = 0;
while (n < 100) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
versionDO = tableCacheVersionMapper.selectOne(queryWrapper);
if (versionDO != null && "1".equals(versionDO.getStatus())) {
version = versionDO.getVersion();
total = versionDO.getTableCount();
break;
}
n++;
}
} else {
total = addDBCache(param.getDataSourceId(), param.getDatabaseName(), param.getSchemaName(), version);
TableCacheVersionDO versionDO1 = new TableCacheVersionDO();
versionDO1.setStatus("1");
versionDO1.setTableCount(total);
tableCacheVersionMapper.update(versionDO1, queryWrapper);
}
} else {
if ("2".equals(versionDO.getStatus())) {
version = versionDO.getVersion() - 1;
} else {
version = versionDO.getVersion();
}
total = versionDO.getTableCount();
}
LambdaQueryWrapper<TableCacheDO> query = new LambdaQueryWrapper<>();
query.eq(TableCacheDO::getVersion, versionDO.getVersion());
query.eq(TableCacheDO::getVersion, version);
query.eq(TableCacheDO::getDataSourceId, param.getDataSourceId());
if (StringUtils.isNotBlank(param.getDatabaseName())) {
query.eq(TableCacheDO::getDatabaseName, param.getDatabaseName());
@ -167,19 +199,22 @@ public class TableServiceImpl implements TableService {
tables.add(t);
}
}
return PageResult.of(tables, versionDO.getTableCount(), param);
return PageResult.of(tables, total, param);
}
@Override
public ListResult<SimpleTable> queryTables(TablePageQueryParam param) {
LambdaQueryWrapper<TableCacheVersionDO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(TableCacheVersionDO::getKey, getTableKey(param.getDataSourceId(), param.getDatabaseName(), param.getSchemaName()));
String key = getTableKey(param.getDataSourceId(), param.getDatabaseName(), param.getSchemaName());
queryWrapper.eq(TableCacheVersionDO::getKey, key);
TableCacheVersionDO versionDO = tableCacheVersionMapper.selectOne(queryWrapper);
if (param.isRefresh() || versionDO == null) {
versionDO = addDBCache(param.getDataSourceId(),param.getDatabaseName(), param.getSchemaName(), versionDO);
if(versionDO == null){
return ListResult.of(Lists.newArrayList());
}
long version = "2".equals(versionDO.getStatus()) ? versionDO.getVersion() - 1 : versionDO.getVersion();
LambdaQueryWrapper<TableCacheDO> query = new LambdaQueryWrapper<>();
query.eq(TableCacheDO::getVersion, versionDO.getVersion());
query.eq(TableCacheDO::getVersion, version);
query.eq(TableCacheDO::getDataSourceId, param.getDataSourceId());
if (StringUtils.isNotBlank(param.getDatabaseName())) {
query.eq(TableCacheDO::getDatabaseName, param.getDatabaseName());
@ -189,8 +224,8 @@ public class TableServiceImpl implements TableService {
}
List<SimpleTable> tables = new ArrayList<>();
for (int i = 0; i < versionDO.getTableCount()/500 +1; i++) {
Page<TableCacheDO> page = new Page<>(i+1, 500);
for (int i = 0; i < versionDO.getTableCount() / 500 + 1; i++) {
Page<TableCacheDO> page = new Page<>(i + 1, 500);
IPage<TableCacheDO> iPage = tableCacheMapper.selectPage(page, query);
if (CollectionUtils.isNotEmpty(iPage.getRecords())) {
for (TableCacheDO tableCacheDO : iPage.getRecords()) {
@ -204,23 +239,9 @@ public class TableServiceImpl implements TableService {
return ListResult.of(tables);
}
private TableCacheVersionDO addDBCache(Long dataSourceId,String databaseName,String schemaName, TableCacheVersionDO versionDO) {
private long addDBCache(Long dataSourceId, String databaseName, String schemaName, long version) {
String key = getTableKey(dataSourceId, databaseName, schemaName);
if (versionDO == null) {
versionDO = new TableCacheVersionDO();
versionDO.setDatabaseName(databaseName);
versionDO.setSchemaName(schemaName);
versionDO.setDataSourceId(dataSourceId);
versionDO.setStatus("2");
versionDO.setKey(key);
versionDO.setVersion(0L);
versionDO.setTableCount(0L);
tableCacheVersionMapper.insert(versionDO);
} else {
versionDO.setVersion(versionDO.getVersion() + 1);
versionDO.setStatus("2");
tableCacheVersionMapper.updateById(versionDO);
}
Connection connection = Chat2DBContext.getConnection();
long n = 0;
try (ResultSet resultSet = connection.getMetaData().getTables(databaseName, schemaName, null,
@ -233,7 +254,7 @@ public class TableServiceImpl implements TableService {
tableCacheDO.setTableName(resultSet.getString("TABLE_NAME"));
tableCacheDO.setExtendInfo(resultSet.getString("REMARKS"));
tableCacheDO.setDataSourceId(dataSourceId);
tableCacheDO.setVersion(versionDO.getVersion());
tableCacheDO.setVersion(version);
tableCacheDO.setKey(key);
cacheDOS.add(tableCacheDO);
if (cacheDOS.size() >= 500) {
@ -245,12 +266,9 @@ public class TableServiceImpl implements TableService {
if (!CollectionUtils.isEmpty(cacheDOS)) {
tableCacheMapper.batchInsert(cacheDOS);
}
versionDO.setStatus("1");
versionDO.setTableCount(n);
tableCacheVersionMapper.updateById(versionDO);
LambdaQueryWrapper<TableCacheDO> q = new LambdaQueryWrapper();
q.eq(TableCacheDO::getDataSourceId, dataSourceId);
q.lt(TableCacheDO::getVersion, versionDO.getVersion());
q.lt(TableCacheDO::getVersion, version);
if (StringUtils.isNotBlank(databaseName)) {
q.eq(TableCacheDO::getDatabaseName, databaseName);
}
@ -261,8 +279,40 @@ public class TableServiceImpl implements TableService {
} catch (SQLException e) {
throw new RuntimeException(e);
}
return versionDO;
return n;
}
private Long getLock(Long dataSourceId, String databaseName, String schemaName, TableCacheVersionDO versionDO) {
String key = getTableKey(dataSourceId, databaseName, schemaName);
if (versionDO == null) {
versionDO = new TableCacheVersionDO();
versionDO.setDatabaseName(databaseName);
versionDO.setSchemaName(schemaName);
versionDO.setDataSourceId(dataSourceId);
versionDO.setStatus("2");
versionDO.setKey(key);
versionDO.setVersion(0L);
versionDO.setTableCount(0L);
try {
tableCacheVersionMapper.insert(versionDO);
return 0L;
} catch (Exception e) {
return -1L;
}
} else {
long version = versionDO.getVersion() + 1;
LambdaQueryWrapper<TableCacheVersionDO> queryWrapper = new LambdaQueryWrapper();
queryWrapper.eq(TableCacheVersionDO::getId, versionDO.getId());
queryWrapper.eq(TableCacheVersionDO::getVersion, versionDO.getVersion());
versionDO.setVersion(version);
versionDO.setStatus("2");
int n = tableCacheVersionMapper.update(versionDO, queryWrapper);
if (n == 1) {
return version;
} else {
return -1L;
}
}
}