package com.yuce.data.cube.service.worktable;
import cn.hutool.core.map.MapUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.yuce.asset.model.enums.FieldLogicalTypeEnum;
import com.yuce.base.exception.YuceExceptionBase;
import com.yuce.base.util.JsonUtil;
import com.yuce.base.util.UUIDUtil;
import com.yuce.cube.event.ResourceObjectEvent;
import com.yuce.cube.shield.model.bo.ColumnPermissionBO;
import com.yuce.cube.shield.model.dto.ObjectAuthDTO;
import com.yuce.cube.shield.model.enums.PermissionObjectTypeEnum;
import com.yuce.cube.shield.service.ObjectAuthService;
import com.yuce.data.common.config.CacheConstants;
import com.yuce.data.common.config.CubeConfig;
import com.yuce.data.common.config.CubeSourceFactory;
import com.yuce.data.common.exception.ObjectNotExistException;
import com.yuce.data.common.model.collection.DataCollection;
import com.yuce.data.common.model.enums.CollectionSourceEnum;
import com.yuce.data.common.model.enums.CubeCategoryEnum;
import com.yuce.data.common.model.enums.OperatorEnum;
import com.yuce.data.common.model.field.Field;
import com.yuce.data.common.model.field.FieldAttrs;
import com.yuce.data.common.task.TableFieldMappingForMigrateDTO;
import com.yuce.data.common.util.ThreadLocalUtils;
import com.yuce.data.cube.dao.WorkTableFieldDao;
import com.yuce.data.cube.model.bo.analysis.field.CollectionField;
import com.yuce.data.cube.model.bo.table.TableField;
import com.yuce.data.cube.model.bo.table.UserDefinedTableSchema;
import com.yuce.data.cube.model.bo.table.WorkTable;
import com.yuce.data.cube.model.dataobject.WorkTableFieldDO;
import com.yuce.data.cube.model.enums.AssetRetrieveType;
import com.yuce.data.cube.model.enums.RetrieveWordSourceType;
import com.yuce.data.cube.model.enums.TableStorageMediumEnum;
import com.yuce.data.cube.service.analysis.DataCollectionService;
import com.yuce.data.cube.service.analysis.field.CollectionFieldService;
import com.yuce.data.cube.service.asset.AssetRetrieveService;
import com.yuce.data.cube.service.datasource.DataSourceManager;
import com.yuce.data.cube.util.TableOperator;
import com.yuce.data.rawdb.DBOperator;
import com.yuce.data.rawdb.DBOperatorFactory;
import com.yuce.data.rawdb.DataSourceConfig;
import com.yuce.data.rawdb.StarrocksFieldTypeEnum;
import com.yuce.data.rawdb.parser.SqlParser;
import com.yuce.data.rawdb.parser.SqlParserFactory;
import com.yuce.data.rawdb.schema.FieldTypeEnum;
import com.yuce.data.rawdb.schema.RawFieldDesc;
import com.yuce.data.rawdb.schema.RawTableSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.BooleanUtils;
import org.eclipse.jetty.util.StringUtil;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.CacheManager;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@Service
@Slf4j
public class WorkTableFieldServiceImpl implements WorkTableFieldService {
@Autowired
CacheManager cacheManager;
@Autowired
private ObjectAuthService objectAuthService;
@Autowired
private WorkTableFieldDao rawTableFieldDao;
@Autowired
private AssetRetrieveService assetRetrieveService;
@Lazy
@Autowired(required = false)
private WorkTableService workTableService;
@Lazy
@Autowired(required = false)
private DataSourceManager dataSourceManager;
@Autowired
private DataCollectionService dataCollectionService;
@Lazy
@Autowired
private CollectionFieldService collectionFieldService;
@Autowired
private ApplicationContext applicationContext;
@Override
public List<Field> getAllTableFields(String tableId) {
return getAllTableFields(tableId, false);
}
@Override
public List<Field> getAllTableFields(String tableId, boolean getRawFieldType) {
List<Field> fields = Lists.newArrayList();
QueryWrapper fieldQuery = new QueryWrapper();
fieldQuery.eq("table_id", tableId);
fieldQuery.orderByAsc("pos");
List<WorkTableFieldDO> fieldDOS = rawTableFieldDao.selectList(fieldQuery);
for (WorkTableFieldDO fieldDO : fieldDOS) {
TableField field = convertToFieldVO(fieldDO);
fields.add(field);
}
if (getRawFieldType) {
WorkTable workTable = workTableService.loadWorkTableById(tableId);
if (workTable != null) {
try {
RawTableSchema tableSchema = TableOperator.getTableOperator().getTableSchema(workTable.getStorageMedium(), workTable.getTableName());
Map<String, RawFieldDesc> rawFieldDescMap = tableSchema.getFields().stream()
.collect(Collectors.toMap(RawFieldDesc::getColumnName, Function.identity()));
for (Field field : fields) {
if (rawFieldDescMap.containsKey(field.getFieldName())) {
RawFieldDesc rawFieldDesc = rawFieldDescMap.get(field.getFieldName());
field.setFieldType(rawFieldDesc.getDataType());
field.setStarrocksFieldTypeEnum(StarrocksFieldTypeEnum.findEnumByDataType(field.getFieldType()));
}
}
} catch (Exception e) {
log.warn("", e);
}
}
}
return fields;
}
@Override
public List<Field> getValidTableFields(String tableId) {
List<Field> fieldVOS = Lists.newArrayList();
QueryWrapper fieldQuery = new QueryWrapper();
fieldQuery.eq("table_id", tableId);
fieldQuery.eq("status", 0);
fieldQuery.orderByAsc("pos");
List<WorkTableFieldDO> fieldDOS = rawTableFieldDao.selectList(fieldQuery);
for (WorkTableFieldDO fieldDO : fieldDOS) {
TableField field = convertToFieldVO(fieldDO);
fieldVOS.add(field);
}
return fieldVOS;
}
@Override
public List<Field> getTableFieldsHavingPermissions(String tableId, String userId) {
List<Field> tableFields = getValidTableFields(tableId);
tableFields = filterTableUnanalysizableFields(tableId, tableFields, userId);
return tableFields;
}
protected List<Field> filterTableUnanalysizableFields(String tableId, List<Field> tableFields, String owner) {
ObjectAuthDTO authDTO = objectAuthService.getUserObjectAuthPermissionDTO(PermissionObjectTypeEnum.TABLE, tableId, owner);
if (authDTO != null
&& !authDTO.isAdmin()
&& authDTO.isEnableColumnPermission()) {
ColumnPermissionBO columnPermission = authDTO.getColumnPermission();
Set<String> analysizableFieldIds = Sets.newHashSet();
if (columnPermission != null) {
analysizableFieldIds.addAll(columnPermission.getAnalyzableColumns());
}
tableFields = tableFields.stream()
.filter(field -> analysizableFieldIds.contains(field.getId()))
.collect(Collectors.toList());
}
return tableFields;
}
@Override
public void deleteTableFields(String tableId) {
UpdateWrapper<WorkTableFieldDO> wrapper = new UpdateWrapper<WorkTableFieldDO>();
wrapper.eq("table_id", tableId);
rawTableFieldDao.delete(wrapper);
}
@Override
public TableField loadFieldById(String id) {
return convertToFieldVO(rawTableFieldDao.selectById(id));
}
@Override
public void setTableDisplayFields(String tableId, List<String> validFields) {
log.info("start to set valid fields {} on table {}", validFields, tableId);
QueryWrapper fieldQuery = new QueryWrapper();
fieldQuery.eq("table_id", tableId);
fieldQuery.orderByAsc("pos");
Set<String> fieldSet = Sets.newHashSet(validFields);
List<WorkTableFieldDO> fieldDOS = rawTableFieldDao.selectList(fieldQuery);
List<WorkTableFieldDO> tosetValidFields = Lists.newArrayList();
List<WorkTableFieldDO> tosetInValidFields = Lists.newArrayList();
for (WorkTableFieldDO fieldDO : fieldDOS) {
if (fieldSet.contains(fieldDO.getId())) {
tosetValidFields.add(fieldDO);
} else {
tosetInValidFields.add(fieldDO);
}
}
if (CollectionUtils.isEmpty(tosetValidFields)) {
throw new YuceExceptionBase("重置字段后,表" + tableId + "中没有有效字段,忽略该操作");
}
for (WorkTableFieldDO validFieldDO : tosetValidFields) {
if (validFieldDO.getStatus().equals(Field.STATUS_DISPLAY)) {
continue;
}
validFieldDO.setStatus(Field.STATUS_DISPLAY);
log.info("start to set valid status for field {}", validFieldDO.toString());
rawTableFieldDao.updateById(validFieldDO);
assetRetrieveService.addOrUpdateAssetRetrieveWord(validFieldDO.getId(), RetrieveWordSourceType.WORKTABLE_FIELD,
tableId, AssetRetrieveType.WORKTABLE, validFieldDO.getAliasFieldName());
}
for (WorkTableFieldDO invalidFieldDO : tosetInValidFields) {
if (invalidFieldDO.getStatus().equals(Field.STATUS_HIDE)) {
continue;
}
invalidFieldDO.setStatus(Field.STATUS_HIDE);
log.info("start to set invalid status for field {}", invalidFieldDO.toString());
rawTableFieldDao.updateById(invalidFieldDO);
assetRetrieveService.deleteRetrievalWordBySourceId(invalidFieldDO.getId());
}
}
@Override
public void validField(String tableId, String fieldId) {
log.info("start to valid field {}", fieldId);
WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(fieldId);
if (!Field.STATUS_DISPLAY.equals(fieldDO.getStatus())) {
fieldDO.setStatus(Field.STATUS_DISPLAY);
rawTableFieldDao.updateById(fieldDO);
assetRetrieveService.addOrUpdateAssetRetrieveWord(fieldDO.getId(), RetrieveWordSourceType.WORKTABLE_FIELD,
fieldDO.getTableId(), AssetRetrieveType.WORKTABLE, fieldDO.getAliasFieldName());
}
}
@Override
public void invalidField(String tableId, String fieldId) {
log.info("start to invalid field {}", fieldId);
UpdateWrapper updateWrapper = new UpdateWrapper();
updateWrapper.eq("id", fieldId);
updateWrapper.set("status", 1);
rawTableFieldDao.update(null, updateWrapper);
assetRetrieveService.deleteRetrievalWordBySourceId(fieldId);
}
@Override
public void invalidFieldByFieldName(String tableId, String fieldName) {
log.info("start to invalid field tableId : {}, fieldName:{}", tableId, fieldName);
QueryWrapper<WorkTableFieldDO> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("table_id", tableId);
queryWrapper.eq("field_name", fieldName);
List<WorkTableFieldDO> fieldList = rawTableFieldDao.selectList(queryWrapper);
if (CollectionUtils.isEmpty(fieldList)) {
throw new ObjectNotExistException("表" + tableId + "中没有字段" + fieldName);
}
WorkTableFieldDO fieldDO = fieldList.get(0);
fieldDO.setStatus(Field.STATUS_HIDE);
rawTableFieldDao.updateById(fieldDO);
assetRetrieveService.deleteRetrievalWordBySourceId(fieldDO.getId());
}
@Override
public void updateFieldAliasName(String tableId, String fieldId, String fieldAliasName) {
WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(fieldId);
if (fieldDO == null) {
throw new ObjectNotExistException("表字段" + fieldId + "不存在");
}
fieldDO.setAliasFieldName(fieldAliasName);
rawTableFieldDao.updateById(fieldDO);
assetRetrieveService.addOrUpdateAssetRetrieveWord(fieldId, RetrieveWordSourceType.WORKTABLE_FIELD,
fieldDO.getTableId(), AssetRetrieveType.WORKTABLE, fieldAliasName);
WorkTable workTable = workTableService.loadWorkTableById(tableId);
modifyCubeTableFieldComment(workTable, fieldDO.getFieldName(), fieldAliasName);
}
private void modifyCubeTableFieldComment(WorkTable workTable, String fieldName, String fieldComment) {
if (workTable == null) {
return;
}
DataSourceConfig dataSourceConfig = dataSourceManager.getCubeWriterDataSource(workTable.getStorageMedium());
if (dataSourceConfig == null) {
return;
}
SqlParser sqlParser = SqlParserFactory.getSqlParser(dataSourceConfig.getSourceTypeEnum());
DBOperator dbOperator = DBOperatorFactory.getDBOperator(dataSourceConfig);
if (CubeConfig.isEnableDremio()) {
return;
}
String commentSql = String.format("comment on column %s.%s is '%s'",
sqlParser.normalizeTableName(workTable.getTableSchema(), workTable.getTableName()),
sqlParser.normalizeName(fieldName), fieldComment);
try {
if (dbOperator.exists(workTable.getTableSchema(), workTable.getTableName())) {
dbOperator.execute(commentSql);
}
} catch (Exception e) {
log.warn("", e);
}
}
@Override
public void updateFieldType(String tableId, String fieldId, FieldTypeEnum fieldType) {
WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(fieldId);
if (fieldDO == null) {
throw new ObjectNotExistException("表字段" + fieldId + "不存在");
}
fieldDO.setOrigFieldType(fieldDO.getOrigFieldType());
fieldDO.setFieldTypeEnum(fieldType.getCode());
rawTableFieldDao.updateById(fieldDO);
}
@Override
public void updateFieldLogicalType(String tableId, String fieldId, FieldLogicalTypeEnum fieldLogicalTypeEnum) {
WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(fieldId);
if (fieldDO == null) {
throw new ObjectNotExistException("表字段" + fieldId + "不存在");
}
fieldDO.addAttr("fieldLogicalType", fieldLogicalTypeEnum.getCode());
rawTableFieldDao.updateById(fieldDO);
}
@Override
public void updateFieldComment(String tableId, String fieldId, String comment) {
WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(fieldId);
if (fieldDO == null) {
throw new ObjectNotExistException("表字段" + fieldId + "不存在");
}
fieldDO.setComment(comment);
fieldDO.addAttr(FieldAttrs.COMMENT, comment);
rawTableFieldDao.updateById(fieldDO);
}
@Override
public void adjustFieldPos(String tableId, String fieldId, String aheadFieldId) {
WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(fieldId);
if (fieldDO == null) {
throw new ObjectNotExistException("字段" + fieldId + "不存在");
}
Integer curPos = rawTableFieldDao.getFieldPos(aheadFieldId);
if (curPos == null) {
curPos = 0;
}
rawTableFieldDao.plusPos(fieldDO.getTableId(), curPos);
fieldDO.setPos(curPos + 1);
rawTableFieldDao.updateById(fieldDO);
}
@Override
public void addWorkTableFieldsByRawDescs(String tableId, List<RawFieldDesc> rawFieldDescs) {
if (CollectionUtils.isEmpty(rawFieldDescs)) {
return;
}
List<Field> fields = rawFieldDescs.stream()
.map(fieldDesc -> convertRawDescToField(tableId, fieldDesc, null))
.collect(Collectors.toList());
addWorkTableFields(tableId, fields);
clearTableFieldCache(tableId);
}
@Override
public void addWorkTableFieldsByMapping(String tableId, RawTableSchema tableSchema, List<TableFieldMappingForMigrateDTO> tableFieldMappingList) {
if (StringUtil.isBlank(tableId) || CollectionUtils.isEmpty(tableFieldMappingList)) {
return;
}
if (tableSchema == null) {
WorkTable workTable = workTableService.loadWorkTableById(tableId);
tableSchema = dataSourceManager.getTableSchema(workTable.getSourceId(), workTable.getSrcTableName());
}
List<Field> oldFieldList = getAllTableFields(tableId);
Map<String, Field> oldFieldMap = null;
Map<String, String> deleteFieldMap = null;
if (!CollectionUtils.isEmpty(oldFieldList)) {
oldFieldMap = oldFieldList.stream().collect(Collectors.toMap(Field::getFieldName, Function.identity()));
deleteFieldMap = oldFieldList.stream().collect(Collectors.toMap(Field::getFieldName, Field::getId));
} else {
oldFieldMap = Maps.newHashMap();
deleteFieldMap = Maps.newHashMap();
}
List<RawFieldDesc> rawFieldDescs = tableSchema.getFields();
Map<String, RawFieldDesc> rawFieldDescMap = rawFieldDescs.stream().collect(Collectors.toMap(RawFieldDesc::getColumnName, Function.identity()));
List<Field> addFieldList = Lists.newArrayList();
for (TableFieldMappingForMigrateDTO tableFieldMappingDTO : tableFieldMappingList) {
RawFieldDesc rawFieldDesc = null;
if (StringUtil.isNotBlank(tableFieldMappingDTO.getSrcFieldName())) {
rawFieldDesc = rawFieldDescMap.get(tableFieldMappingDTO.getSrcFieldName());
if (rawFieldDesc == null) {
tableFieldMappingDTO.setSrcFieldName(null);
}
}
if (StringUtil.isBlank(tableFieldMappingDTO.getSrcFieldName()) && !oldFieldMap.containsKey(tableFieldMappingDTO.getFieldName())) {
throw new YuceExceptionBase("创建时" + tableFieldMappingDTO.getFieldName() + "对应的原始字段不存在");
}
deleteFieldMap.remove(tableFieldMappingDTO.getFieldName());
if (oldFieldMap.containsKey(tableFieldMappingDTO.getFieldName())) {
Field field = oldFieldMap.get(tableFieldMappingDTO.getFieldName());
Field update = new TableField();
update.setId(field.getId());
update.setStatus(Field.STATUS_DISPLAY);
update.setOrigFieldName(tableFieldMappingDTO.getSrcFieldName());
update.setOrigFieldType(rawFieldDesc == null ? null : rawFieldDesc.getFieldTypeEnum());
update.setFieldName(tableFieldMappingDTO.getFieldName());
if (tableFieldMappingDTO.getFieldTypeEnum() != null) {
update.setFieldTypeEnum(tableFieldMappingDTO.getFieldTypeEnum());
update.setFieldType(tableFieldMappingDTO.getFieldTypeEnum().getType());
}
if (StringUtil.isNotBlank(tableFieldMappingDTO.getFieldComment())) {
update.setAliasFieldName(tableFieldMappingDTO.getFieldComment());
}
updateTableField(tableId, update, true);
if (StringUtil.isBlank(tableFieldMappingDTO.getSrcFieldName())) {
rawTableFieldDao.updateOrigFieldNameIsNull(field.getId());
}
} else {
TableField tableField = convertRawDescToField(tableId, rawFieldDesc, tableFieldMappingDTO);
addFieldList.add(tableField);
}
}
addWorkTableFields(tableId, addFieldList);
if (!MapUtil.isEmpty(deleteFieldMap)) {
deleteFieldMap.values().forEach(fieldId -> {
invalidField(tableId, fieldId);
});
}
clearTableFieldCache(tableId);
}
@Override
public void addWorkTableFields(String tableId, List<Field> workTableFields) {
if (CollectionUtils.isEmpty(workTableFields)) {
return;
}
List<Field> origFields = getAllTableFields(tableId);
Map<String, Field> origFieldNameMap = Maps.newLinkedHashMap();
origFields.forEach(field -> {
if (field.getFieldName() == null) {
return;
}
origFieldNameMap.put(field.getFieldName(), field);
});
int pos = origFields.size();
for (Field fieldDesc : workTableFields) {
WorkTableFieldDO fieldDO = convertToTableFieldDO(tableId, fieldDesc);
try {
Field origField = origFieldNameMap.get(fieldDesc.getFieldName());
if (origField != null) {
WorkTableFieldDO origFieldDO = convertToTableFieldDO(tableId, origField);
origFieldDO.setFieldType(fieldDO.getFieldType());
origFieldDO.setFieldTypeEnum(fieldDO.getFieldTypeEnum());
origFieldDO.setOrigFieldType(fieldDO.getOrigFieldType() == null ? fieldDO.getFieldTypeEnum() : fieldDO.getOrigFieldType());
rawTableFieldDao.updateById(origFieldDO);
} else {
if (fieldDesc.getPos() == null) {
fieldDO.setPos(pos);
}
rawTableFieldDao.insert(fieldDO);
}
pos++;
} catch (Exception e) {
log.warn("exception catched when create table field, fieldDO: {}", JsonUtil.objectToJson(fieldDO), e);
}
}
clearTableFieldCache(tableId);
}
@Override
public void updateWorkTableFields(String tableId, List<Field> workTableFields) {
if (CollectionUtils.isEmpty(workTableFields)) {
return;
}
List<Field> origFields = getAllTableFields(tableId);
Map<String, Field> origFieldMaps = Maps.newHashMap();
for (Field origField : origFields) {
origFieldMaps.put(origField.getFieldName(), origField);
}
int pos = origFields.size();
for (Field workTableField : workTableFields) {
if (origFieldMaps.containsKey(workTableField.getFieldName())) {
Field field = origFieldMaps.get(workTableField.getFieldName());
WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(field.getId());
fieldDO.setAliasFieldName(workTableField.getAliasFieldName());
if (workTableField instanceof TableField) {
fieldDO.setComment(((TableField) workTableField).getComment());
}
fieldDO.setSourceFieldId(workTableField.getSourceFieldId());
if (workTableField.getFieldTypeEnum() != null) {
fieldDO.setFieldTypeEnum(workTableField.getFieldTypeEnum().getCode());
fieldDO.setFieldType(workTableField.getFieldTypeEnum().getType());
}
if (workTableField.getOrigFieldType() != null) {
fieldDO.setOrigFieldType(workTableField.getOrigFieldType().getCode());
}
rawTableFieldDao.updateById(fieldDO);
} else {
WorkTableFieldDO fieldDO = convertToTableFieldDO(tableId, workTableField);
fieldDO.setPos(++pos);
rawTableFieldDao.insert(fieldDO);
assetRetrieveService.addOrUpdateAssetRetrieveWord(fieldDO.getId(), RetrieveWordSourceType.WORKTABLE_FIELD,
tableId, AssetRetrieveType.WORKTABLE, fieldDO.getAliasFieldName());
}
}
}
@Override
public void updateTableField(String tableId, Field workTableField) {
updateTableField(tableId, workTableField, false);
}
public void updateTableField(String tableId, Field workTableField, boolean ignoreUpdateStarrocks) {
Field oldField = loadFieldById(workTableField.getId());
WorkTableFieldDO fieldDO = convertToTableFieldDO(tableId, workTableField);
rawTableFieldDao.updateById(fieldDO);
WorkTable workTable = workTableService.loadWorkTableById(tableId);
if (!ignoreUpdateStarrocks && !oldField.getFieldType().equals(workTableField.getFieldType())) {
log.info("start to update workTableField {} oldField {}", workTableField, oldField);
TableOperator.getTableOperator().updateField(workTable.getStorageMedium(), workTable.getTableSchema(), workTable.getTableName(), getRawFieldDesc(workTableField));
TableOperator.refreshMeta(workTable.getStorageMedium(), workTable.getTableName());
} else {
log.info("field {} not need update tableId {}", JSONObject.toJSONString(workTableField), JSONObject.toJSONString(oldField));
}
cacheManager.getCache(CacheConstants.CACHE_VALUES_WORK_TABLE_FIELDS).evictIfPresent(tableId);
cacheManager.getCache(CacheConstants.CACHE_VALUES_TABLE_FIELD).evictIfPresent(workTableField.getId());
}
@Override
public void updateTableField(String tableId, Field workTableField, Map<String, RawFieldDesc> rawFieldDescMap) {
Field oldField = loadFieldById(workTableField.getId());
WorkTableFieldDO fieldDO = convertToTableFieldDO(tableId, workTableField);
rawTableFieldDao.updateById(fieldDO);
WorkTable workTable = workTableService.loadWorkTableById(tableId);
RawFieldDesc rawFieldDesc = rawFieldDescMap.get(oldField.getFieldName());
if(rawFieldDesc != null) {
String workTableFieldType = workTableField.getFieldType().toLowerCase();
String rawFieldDescFieldType = rawFieldDesc.getDataType().toLowerCase();
if (!workTableFieldType.equals(rawFieldDescFieldType)) {
log.info("字段更新,新旧字段类型不一致 {} {}", JSONObject.toJSONString(workTableField), JSONObject.toJSONString(rawFieldDesc));
TableOperator.getTableOperator().updateField(workTable.getStorageMedium(), workTable.getTableSchema(), workTable.getTableName(), getRawFieldDesc(workTableField));
TableOperator.refreshMeta(workTable.getStorageMedium(), workTable.getTableName());
} else {
log.info("字段未更新,新旧字段类型一致 {} {}", JSONObject.toJSONString(workTableField), JSONObject.toJSONString(rawFieldDesc));
}
}
cacheManager.getCache(CacheConstants.CACHE_VALUES_WORK_TABLE_FIELDS).evictIfPresent(tableId);
cacheManager.getCache(CacheConstants.CACHE_VALUES_TABLE_FIELD).evictIfPresent(workTableField.getId());
}
private RawFieldDesc getRawFieldDesc(Field tableField) {
UserDefinedTableSchema tableSchema = new UserDefinedTableSchema();
UserDefinedTableSchema.Column column = new UserDefinedTableSchema.Column(0);
column.setFieldName(tableField.getFieldName());
column.setAliasFieldName(tableField.getAliasFieldName());
column.setFieldTypeEnum(tableField.getFieldTypeEnum());
column.setFieldType(tableField.getFieldType());
tableSchema.addColumn(column);
return tableSchema.toRawTableSchema().getFields().get(0);
}
@Override
public void updateWorkTableFieldsByRawDescs(String tableId, List<RawFieldDesc> rawFieldDescs) {
if (CollectionUtils.isEmpty(rawFieldDescs)) {
return;
}
List<Field> fields = rawFieldDescs.stream()
.map(fieldDesc -> convertRawDescToField(tableId, fieldDesc, null))
.collect(Collectors.toList());
updateWorkTableFields(tableId, fields);
}
protected TableField convertToFieldVO(WorkTableFieldDO fieldDO) {
if (fieldDO == null) {
;
return null;
}
TableField fieldVO = com.yuce.base.util.BeanUtils.copyObject(fieldDO, TableField.class, (src, dest) -> {
dest.setStatus(src.getStatus());
dest.setObjId(src.getTableId());
dest.setId(src.getId());
if (src.getFieldTypeEnum() != null) {
dest.setFieldTypeEnum(FieldTypeEnum.fromCode(src.getFieldTypeEnum()));
}
if (src.getOrigFieldType() != null) {
dest.setOrigFieldType(FieldTypeEnum.fromCode(src.getOrigFieldType()));
}
if (src.getAttrs() != null) {
dest.setAttrs(JsonUtil.jsonToObject(src.getAttrs(), Map.class));
}
if (src.getPartitionKey() != null) {
dest.setPartitionKey(src.getPartitionKey() == 1);
}
});
fieldVO.setObjId(fieldDO.getTableId());
return fieldVO;
}
protected WorkTableFieldDO convertToTableFieldDO(String tableId, Field field) {
WorkTableFieldDO fieldDO = com.yuce.base.util.BeanUtils.copyObject(field, WorkTableFieldDO.class, (src, dest) -> {
dest.setStatus(src.getStatus());
dest.setTableId(tableId);
if (src.getId() == null) {
dest.setId(UUIDUtil.getID());
}
if (src.getFieldTypeEnum() != null) {
dest.setFieldTypeEnum(src.getFieldTypeEnum().getCode());
}
if (src.getOrigFieldType() != null) {
dest.setOrigFieldType(src.getOrigFieldType().getCode());
}
if (!CollectionUtils.isEmpty(src.getAttrs())) {
dest.setAttrs(JsonUtil.objectToJson(src.getAttrs()));
}
if (src.getPartitionKey() != null) {
dest.setPartitionKey(src.getPartitionKey() ? 1 : 0);
}
});
return fieldDO;
}
protected TableField convertRawDescToField(String tableId, RawFieldDesc fieldDesc, TableFieldMappingForMigrateDTO tableFieldMappingDTO) {
TableField tableField = new TableField();
tableField.setObjId(tableId);
String fieldName = com.yuce.base.util.StringUtil.filterString(fieldDesc.getColumnName());
tableField.setFieldName(fieldName);
if (tableFieldMappingDTO != null && StringUtil.isNotBlank(tableFieldMappingDTO.getFieldName())) {
tableField.setFieldName(tableFieldMappingDTO.getFieldName());
}
tableField.setOrigFieldName(tableFieldMappingDTO == null ? fieldDesc.getColumnName() : tableFieldMappingDTO.getSrcFieldName());
if (!StringUtils.isEmpty(fieldDesc.getColumnComment())) {
tableField.setAliasFieldName(fieldDesc.getColumnComment());
} else {
tableField.setAliasFieldName(fieldDesc.getColumnName());
}
tableField.setAttr("indexType", fieldDesc.getColumnType());
if (fieldDesc.isUniKey()) {
tableField.setAttr("uniqRate", 1);
}
tableField.setComment(fieldDesc.getColumnComment());
if (tableFieldMappingDTO != null && StringUtil.isNotBlank(tableFieldMappingDTO.getFieldComment())) {
tableField.setAliasFieldName(tableFieldMappingDTO.getFieldComment());
}
tableField.setPos(fieldDesc.getOrdinalPosition());
tableField.setFieldType(fieldDesc.getColumnType());
tableField.setSourceFieldId(fieldDesc.getSrcFieldId());
FieldTypeEnum fieldTypeEnum = fieldDesc.getFieldTypeEnum();
if (fieldTypeEnum == null) {
fieldTypeEnum = FieldTypeEnum.guessEnumByType(fieldDesc.getColumnType());
}
tableField.setOrigFieldType(fieldTypeEnum);
if (tableFieldMappingDTO != null && tableFieldMappingDTO.getFieldTypeEnum() != null) {
fieldTypeEnum = tableFieldMappingDTO.getFieldTypeEnum();
tableField.setFieldType(fieldTypeEnum.getType());
}
tableField.setFieldTypeEnum(fieldTypeEnum);
tableField.setId(UUIDUtil.getID());
FieldLogicalTypeEnum fieldLogicalTypeEnum;
switch (fieldTypeEnum) {
case DATETIME:
fieldLogicalTypeEnum = FieldLogicalTypeEnum.DATETIME;
break;
case NUMBER:
fieldLogicalTypeEnum = FieldLogicalTypeEnum.ALL_NUMBER;
break;
default:
fieldLogicalTypeEnum = FieldLogicalTypeEnum.ALL_STRING;
}
tableField.setAttr("fieldLogicalType", fieldLogicalTypeEnum.getCode());
return tableField;
}
@Override
public List<Field> loadFieldByIds(Set<String> fieldIds) {
List<Field> fieldVOS = Lists.newArrayList();
QueryWrapper queryWrapper = new QueryWrapper();
queryWrapper.in("id", fieldIds);
List<WorkTableFieldDO> fieldDOS = rawTableFieldDao.selectList(queryWrapper);
for (WorkTableFieldDO fieldDO : fieldDOS) {
TableField field = convertToFieldVO(fieldDO);
fieldVOS.add(field);
}
return fieldVOS;
}
@Override
public void clearTableFieldCache(String tableId) {
cacheManager.getCache(CacheConstants.CACHE_VALUES_WORK_TABLE_FIELDS).evictIfPresent(tableId);
List<Field> fields = getAllTableFields(tableId);
if (CollectionUtils.isEmpty(fields)) {
return;
}
fields.forEach(field -> {
cacheManager.getCache(CacheConstants.CACHE_VALUES_TABLE_FIELD).evictIfPresent(field.getId());
});
}
@Override
public TableField loadFieldByFieldName(String tableId, String fieldName) {
QueryWrapper<WorkTableFieldDO> objectQueryWrapper = new QueryWrapper<>();
objectQueryWrapper.eq("table_id", tableId);
objectQueryWrapper.eq("field_name", fieldName);
List<WorkTableFieldDO> workTableFieldDOList = rawTableFieldDao.selectList(objectQueryWrapper);
if (!CollectionUtils.isEmpty(workTableFieldDOList)) {
return convertToFieldVO(workTableFieldDOList.get(0));
}
return null;
}
@Override
public List<TableField> loadFieldByFieldNames(String tableId, List<String> fieldNames) {
if (CollectionUtils.isEmpty(fieldNames)) {
return Lists.newArrayList();
}
QueryWrapper<WorkTableFieldDO> objectQueryWrapper = new QueryWrapper<>();
objectQueryWrapper.eq("table_id", tableId);
objectQueryWrapper.in("field_name", fieldNames);
List<WorkTableFieldDO> workTableFieldDOList = rawTableFieldDao.selectList(objectQueryWrapper);
if (CollectionUtils.isEmpty(workTableFieldDOList)) {
return Lists.newArrayList();
}
return workTableFieldDOList.stream().map(x -> convertToFieldVO(x)).collect(Collectors.toList());
}
@Override
public void addColumns(String tableId, Field workTableField) {
List<Field> origFields = getAllTableFields(tableId);
Set<String> origFieldNameSet = origFields.stream().map(Field::getFieldName).collect(Collectors.toSet());
if (origFieldNameSet.contains(workTableField.getFieldName())) {
throw new IllegalArgumentException("字段名已存在!");
}
WorkTable workTable = workTableService.loadWorkTableById(tableId);
DataSourceConfig dataSourceConfig = dataSourceManager.getCubeWriterDataSource(workTable.getStorageMedium());
if (dataSourceConfig == null) {
return;
}
DBOperator dbOperator = DBOperatorFactory.getDBOperator(dataSourceConfig);
if (!dbOperator.exists(workTable.getTableSchema(), workTable.getTableName())) {
return;
}
syncFieldToDb(workTable, workTableField.getFieldName(), workTableField.getFieldType(), workTableField.getFieldTypeEnum(), workTableField.getAliasFieldName());
int pos = origFields.size();
WorkTableFieldDO fieldDO = convertToTableFieldDO(tableId, workTableField);
fieldDO.setPos(++pos);
rawTableFieldDao.insert(fieldDO);
syncFieldToDb(workTable, workTableField.getFieldName(), workTableField.getFieldType(), workTableField.getFieldTypeEnum(), workTableField.getAliasFieldName());
TableOperator.refreshMeta(workTable.getStorageMedium(), workTable.getTableName());
addCollectionFieldFromAssertTableField(tableId, workTableField);
applicationContext.publishEvent(new ResourceObjectEvent(tableId, ThreadLocalUtils.getUserId(), CubeCategoryEnum.DATA_OBJECT, OperatorEnum.UPDATE));
}
public void addCollectionFieldFromAssertTableField(String tableId, Field workTableField) {
List<DataCollection> collectionList = dataCollectionService.getTableRelatedCollections(tableId);
if (CollectionUtils.isEmpty(collectionList)) {
return;
}
for (DataCollection collection : collectionList) {
if (CollectionSourceEnum.ASSET_TABLE.equals(collection.getCollectionSourceEnum())) {
String collectionId = collection.getId();
CollectionField collectionField = com.yuce.base.util.BeanUtils.copyObject(workTableField, CollectionField.class);
collectionField.setId(UUIDUtil.getID());
collectionField.setCollectionId(collectionId);
collectionField.setStatus(CollectionField.STATUS_DISPLAY);
collectionFieldService.addCollectionField(collectionId, collectionField, true);
}
}
}
@Override
public void deleteFieldByFieldName(String tableId, String fieldName) {
if (StringUtil.isBlank(tableId) || StringUtil.isBlank(fieldName)) {
return;
}
LambdaQueryWrapper<WorkTableFieldDO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(WorkTableFieldDO::getTableId, tableId);
queryWrapper.eq(WorkTableFieldDO::getFieldName, fieldName);
rawTableFieldDao.delete(queryWrapper);
}
@Override
public void updateStatusByTableId(String tableId, Integer status) {
if (StringUtil.isBlank(tableId) || status == null) {
return;
}
if (0 != status && 1 != status) {
return;
}
WorkTableFieldDO workTableFieldDO = new WorkTableFieldDO();
workTableFieldDO.setStatus(status);
LambdaQueryWrapper<WorkTableFieldDO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(WorkTableFieldDO::getTableId, tableId);
rawTableFieldDao.update(workTableFieldDO, queryWrapper);
}
@Override
public void updateFieldStatus(String tableId, String fieldId, Integer status) {
WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(fieldId);
if (fieldDO == null) {
throw new ObjectNotExistException("表字段" + fieldId + "不存在");
}
WorkTableFieldDO update = new WorkTableFieldDO();
update.setId(fieldId);
update.setStatus(status);
rawTableFieldDao.updateById(update);
}
@Override
public void updateFieldIdAndTableName(String tableId, String fieldId, String newFieldId, String fieldName) {
if (StringUtil.isBlank(tableId) || StringUtil.isBlank(fieldId) || StringUtil.isBlank(newFieldId) || StringUtil.isBlank(fieldName)) {
return;
}
rawTableFieldDao.updateFieldIdAndTableName(fieldId, newFieldId, fieldName);
}
@Override
public List<TableFieldMappingForMigrateDTO> listTableFieldMappingForMigrateDTO(String tableId) {
if (StringUtil.isBlank(tableId)) {
return Lists.newArrayList();
}
List<Field> fields = getAllTableFields(tableId);
if (CollectionUtils.isEmpty(fields)) {
return Lists.newArrayList();
}
List<TableFieldMappingForMigrateDTO> retList = Lists.newArrayList();
for (Field field : fields) {
TableFieldMappingForMigrateDTO dto = TableFieldMappingForMigrateDTO.builder()
.srcFieldName(StringUtil.isBlank(field.getOrigFieldName()) ? field.getFieldName() : field.getOrigFieldName())
.fieldComment(field.getAliasFieldName())
.fieldTypeEnum(field.getFieldTypeEnum())
.fieldName(field.getFieldName())
.build();
retList.add(dto);
}
return retList;
}
@Override
public Map<FieldTypeEnum, List<StarrocksFieldTypeEnum>> getStarRocksStoreType() {
Map<FieldTypeEnum, List<StarrocksFieldTypeEnum>> res = new HashMap<>();
FieldTypeEnum[] values = FieldTypeEnum.values();
for (FieldTypeEnum fieldTypeEnum : values) {
List<StarrocksFieldTypeEnum> tmp = new ArrayList<>();
switch (fieldTypeEnum) {
case DATE:
tmp.add(StarrocksFieldTypeEnum.DATE);
break;
case DATETIME:
tmp.add(StarrocksFieldTypeEnum.DATETIME);
break;
case STRING:
tmp.add(StarrocksFieldTypeEnum.JSON);
tmp.add(StarrocksFieldTypeEnum.VARCHAR);
break;
case NUMBER:
tmp.add(StarrocksFieldTypeEnum.BIGINT);
tmp.add(StarrocksFieldTypeEnum.DECIMAL);
tmp.add(StarrocksFieldTypeEnum.DOUBLE);
break;
case GEOMETRY:
tmp.add(StarrocksFieldTypeEnum.BINARY);
break;
}
res.put(fieldTypeEnum, tmp);
}
return res;
}
private void syncFieldToDb(WorkTable workTable, String fieldName, String fieldType, FieldTypeEnum fieldTypeEnum, String fieldComment) {
RawFieldDesc fieldDesc = getRawFieldDesc(fieldName, fieldTypeEnum, fieldComment, fieldType);
if(workTable.getStorageMedium() == TableStorageMediumEnum.DATA_LAKE) {
DBOperator dbOperator = CubeSourceFactory.getDremioDBOperator(-1);
dbOperator.addField(CubeSourceFactory.getDremioTableSchemaName(workTable.getStorageMedium()), workTable.getTableName(), fieldDesc);
} else {
DBOperator dbOperator = CubeSourceFactory.getStarrocksDBOperator(workTable.getStorageMedium(), -1);
dbOperator.addField(workTable.getTableSchema(), workTable.getTableName(), getRawFieldDesc(fieldName, fieldTypeEnum, fieldComment, fieldType));
}
TableOperator.refreshMeta(workTable.getStorageMedium(), workTable.getTableName());
}
@Override
public void copyTableFiled(String tableId, String sourceTableId) {
LambdaQueryWrapper<WorkTableFieldDO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(WorkTableFieldDO::getTableId, tableId);
List<WorkTableFieldDO> workTableFieldDOS = rawTableFieldDao.selectList(lambdaQueryWrapper);
for (WorkTableFieldDO workTableFieldDO : workTableFieldDOS) {
WorkTableFieldDO fieldDO = new WorkTableFieldDO();
BeanUtils.copyProperties(workTableFieldDO, fieldDO);
fieldDO.setId(UUIDUtil.getID());
fieldDO.setTableId(sourceTableId);
rawTableFieldDao.insert(fieldDO);
}
}
private RawFieldDesc getRawFieldDesc(String fieldName, FieldTypeEnum fieldTypeEnum, String fieldComment, String fieldType) {
UserDefinedTableSchema tableSchema = new UserDefinedTableSchema();
UserDefinedTableSchema.Column column = new UserDefinedTableSchema.Column(0);
column.setFieldName(fieldName);
column.setAliasFieldName(fieldComment);
column.setFieldTypeEnum(fieldTypeEnum);
column.setFieldType(fieldType);
tableSchema.addColumn(column);
return tableSchema.toRawTableSchema().getFields().get(0);
}
}