编辑代码

package com.yuce.data.cube.service.worktable;

import cn.hutool.core.map.MapUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.yuce.asset.model.enums.FieldLogicalTypeEnum;
import com.yuce.base.exception.YuceExceptionBase;
import com.yuce.base.util.JsonUtil;
import com.yuce.base.util.UUIDUtil;
import com.yuce.cube.event.ResourceObjectEvent;
import com.yuce.cube.shield.model.bo.ColumnPermissionBO;
import com.yuce.cube.shield.model.dto.ObjectAuthDTO;
import com.yuce.cube.shield.model.enums.PermissionObjectTypeEnum;
import com.yuce.cube.shield.service.ObjectAuthService;
import com.yuce.data.common.config.CacheConstants;
import com.yuce.data.common.config.CubeConfig;
import com.yuce.data.common.config.CubeSourceFactory;
import com.yuce.data.common.exception.ObjectNotExistException;
import com.yuce.data.common.model.collection.DataCollection;
import com.yuce.data.common.model.enums.CollectionSourceEnum;
import com.yuce.data.common.model.enums.CubeCategoryEnum;
import com.yuce.data.common.model.enums.OperatorEnum;
import com.yuce.data.common.model.field.Field;
import com.yuce.data.common.model.field.FieldAttrs;
import com.yuce.data.common.task.TableFieldMappingForMigrateDTO;
import com.yuce.data.common.util.ThreadLocalUtils;
import com.yuce.data.cube.dao.WorkTableFieldDao;
import com.yuce.data.cube.model.bo.analysis.field.CollectionField;
import com.yuce.data.cube.model.bo.table.TableField;
import com.yuce.data.cube.model.bo.table.UserDefinedTableSchema;
import com.yuce.data.cube.model.bo.table.WorkTable;
import com.yuce.data.cube.model.dataobject.WorkTableFieldDO;
import com.yuce.data.cube.model.enums.AssetRetrieveType;
import com.yuce.data.cube.model.enums.RetrieveWordSourceType;
import com.yuce.data.cube.model.enums.TableStorageMediumEnum;
import com.yuce.data.cube.service.analysis.DataCollectionService;
import com.yuce.data.cube.service.analysis.field.CollectionFieldService;
import com.yuce.data.cube.service.asset.AssetRetrieveService;
import com.yuce.data.cube.service.datasource.DataSourceManager;
import com.yuce.data.cube.util.TableOperator;
import com.yuce.data.rawdb.DBOperator;
import com.yuce.data.rawdb.DBOperatorFactory;
import com.yuce.data.rawdb.DataSourceConfig;
import com.yuce.data.rawdb.StarrocksFieldTypeEnum;
import com.yuce.data.rawdb.parser.SqlParser;
import com.yuce.data.rawdb.parser.SqlParserFactory;
import com.yuce.data.rawdb.schema.FieldTypeEnum;
import com.yuce.data.rawdb.schema.RawFieldDesc;
import com.yuce.data.rawdb.schema.RawTableSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.BooleanUtils;
import org.eclipse.jetty.util.StringUtil;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.CacheManager;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

@Service
@Slf4j
public class WorkTableFieldServiceImpl implements WorkTableFieldService {

    @Autowired
    CacheManager cacheManager;

    @Autowired
    private ObjectAuthService objectAuthService;

    @Autowired
    private WorkTableFieldDao rawTableFieldDao;
    @Autowired
    private AssetRetrieveService assetRetrieveService;
    @Lazy
    @Autowired(required = false)
    private WorkTableService workTableService;

    @Lazy
    @Autowired(required = false)
    private DataSourceManager dataSourceManager;

    @Autowired
    private DataCollectionService dataCollectionService;

    @Lazy
    @Autowired
    private CollectionFieldService collectionFieldService;

    @Autowired
    private ApplicationContext applicationContext;

    @Override
    public List<Field> getAllTableFields(String tableId) {
        return getAllTableFields(tableId, false);
    }

    @Override
    public List<Field> getAllTableFields(String tableId, boolean getRawFieldType) {
        List<Field> fields = Lists.newArrayList();

        QueryWrapper fieldQuery = new QueryWrapper();
        fieldQuery.eq("table_id", tableId);
        fieldQuery.orderByAsc("pos");
        List<WorkTableFieldDO> fieldDOS = rawTableFieldDao.selectList(fieldQuery);
        for (WorkTableFieldDO fieldDO : fieldDOS) {
            TableField field = convertToFieldVO(fieldDO);
            fields.add(field);
        }
        if (getRawFieldType) {
            WorkTable workTable = workTableService.loadWorkTableById(tableId);
            if (workTable != null) {
                try {
                    RawTableSchema tableSchema = TableOperator.getTableOperator().getTableSchema(workTable.getStorageMedium(), workTable.getTableName());
                    Map<String, RawFieldDesc> rawFieldDescMap = tableSchema.getFields().stream()
                            .collect(Collectors.toMap(RawFieldDesc::getColumnName, Function.identity()));

                    for (Field field : fields) {
                        if (rawFieldDescMap.containsKey(field.getFieldName())) {
                            RawFieldDesc rawFieldDesc = rawFieldDescMap.get(field.getFieldName());
                            field.setFieldType(rawFieldDesc.getDataType());
                            field.setStarrocksFieldTypeEnum(StarrocksFieldTypeEnum.findEnumByDataType(field.getFieldType()));
                        }
                    }
                } catch (Exception e) {
                    log.warn("", e);
                }
            }
        }
        return fields;
    }

    @Override
    public List<Field> getValidTableFields(String tableId) {
        List<Field> fieldVOS = Lists.newArrayList();

        QueryWrapper fieldQuery = new QueryWrapper();
        fieldQuery.eq("table_id", tableId);
        fieldQuery.eq("status", 0);
        fieldQuery.orderByAsc("pos");

        List<WorkTableFieldDO> fieldDOS = rawTableFieldDao.selectList(fieldQuery);
        for (WorkTableFieldDO fieldDO : fieldDOS) {
            TableField field = convertToFieldVO(fieldDO);
            fieldVOS.add(field);
        }

        return fieldVOS;
    }

    @Override
    public List<Field> getTableFieldsHavingPermissions(String tableId, String userId) {
        List<Field> tableFields = getValidTableFields(tableId);

        tableFields = filterTableUnanalysizableFields(tableId, tableFields, userId);
        return tableFields;
    }


    /**
     * 过滤掉工作表中不能分析的字段
     *
     * @param tableFields
     * @param tableId
     * @param owner
     * @return
     */
    protected List<Field> filterTableUnanalysizableFields(String tableId, List<Field> tableFields, String owner) {
        ObjectAuthDTO authDTO = objectAuthService.getUserObjectAuthPermissionDTO(PermissionObjectTypeEnum.TABLE, tableId, owner);
        if (authDTO != null
                && !authDTO.isAdmin()
                && authDTO.isEnableColumnPermission()) {
            ColumnPermissionBO columnPermission = authDTO.getColumnPermission();
            Set<String> analysizableFieldIds = Sets.newHashSet();

            if (columnPermission != null) {
                analysizableFieldIds.addAll(columnPermission.getAnalyzableColumns());
            }
            tableFields = tableFields.stream()
                    .filter(field -> analysizableFieldIds.contains(field.getId()))
                    .collect(Collectors.toList());
        }
        return tableFields;
    }


    @Override
    public void deleteTableFields(String tableId) {
        UpdateWrapper<WorkTableFieldDO> wrapper = new UpdateWrapper<WorkTableFieldDO>();
        wrapper.eq("table_id", tableId);
        rawTableFieldDao.delete(wrapper);
    }

    @Override
    public TableField loadFieldById(String id) {
        return convertToFieldVO(rawTableFieldDao.selectById(id));
    }

    @Override
    public void setTableDisplayFields(String tableId, List<String> validFields) {
        log.info("start to set valid fields {} on table {}", validFields, tableId);

        QueryWrapper fieldQuery = new QueryWrapper();
        fieldQuery.eq("table_id", tableId);
        fieldQuery.orderByAsc("pos");

        Set<String> fieldSet = Sets.newHashSet(validFields);

        List<WorkTableFieldDO> fieldDOS = rawTableFieldDao.selectList(fieldQuery);
        List<WorkTableFieldDO> tosetValidFields = Lists.newArrayList();
        List<WorkTableFieldDO> tosetInValidFields = Lists.newArrayList();

        // 把所有字段拆成有效和无效两个字段集
        for (WorkTableFieldDO fieldDO : fieldDOS) {
            if (fieldSet.contains(fieldDO.getId())) {
                tosetValidFields.add(fieldDO);
            } else {
                tosetInValidFields.add(fieldDO);
            }
        }

        if (CollectionUtils.isEmpty(tosetValidFields)) {
            throw new YuceExceptionBase("重置字段后,表" + tableId + "中没有有效字段,忽略该操作");
        }

        // 把validFields列表中的字段设置为有效状态
        for (WorkTableFieldDO validFieldDO : tosetValidFields) {
            if (validFieldDO.getStatus().equals(Field.STATUS_DISPLAY)) {
                // 字段状态之前已经是有效状态
                continue;
            }
            validFieldDO.setStatus(Field.STATUS_DISPLAY);

            log.info("start to set valid status for field {}", validFieldDO.toString());
            rawTableFieldDao.updateById(validFieldDO);

            assetRetrieveService.addOrUpdateAssetRetrieveWord(validFieldDO.getId(), RetrieveWordSourceType.WORKTABLE_FIELD,
                    tableId, AssetRetrieveType.WORKTABLE, validFieldDO.getAliasFieldName());
        }

        // 把tosetInValidFields列表中的字段,设置为无效状态
        for (WorkTableFieldDO invalidFieldDO : tosetInValidFields) {
            if (invalidFieldDO.getStatus().equals(Field.STATUS_HIDE)) {
                // 字段状态之前已经是隐藏状态
                continue;
            }
            invalidFieldDO.setStatus(Field.STATUS_HIDE);

            log.info("start to set invalid status for field {}", invalidFieldDO.toString());
            rawTableFieldDao.updateById(invalidFieldDO);
            assetRetrieveService.deleteRetrievalWordBySourceId(invalidFieldDO.getId());
        }
    }

    @Override
    public void validField(String tableId, String fieldId) {
        log.info("start to valid field {}", fieldId);
        WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(fieldId);
        if (!Field.STATUS_DISPLAY.equals(fieldDO.getStatus())) {
            fieldDO.setStatus(Field.STATUS_DISPLAY);
            rawTableFieldDao.updateById(fieldDO);

            // 更新检索词
            assetRetrieveService.addOrUpdateAssetRetrieveWord(fieldDO.getId(), RetrieveWordSourceType.WORKTABLE_FIELD,
                    fieldDO.getTableId(), AssetRetrieveType.WORKTABLE, fieldDO.getAliasFieldName());

        }

    }

    @Override
    public void invalidField(String tableId, String fieldId) {
        log.info("start to invalid field {}", fieldId);
        UpdateWrapper updateWrapper = new UpdateWrapper();
        updateWrapper.eq("id", fieldId);
        updateWrapper.set("status", 1);
        rawTableFieldDao.update(null, updateWrapper);
        assetRetrieveService.deleteRetrievalWordBySourceId(fieldId);
    }

    @Override
    public void invalidFieldByFieldName(String tableId, String fieldName) {
        log.info("start to invalid field tableId : {}, fieldName:{}", tableId, fieldName);
        QueryWrapper<WorkTableFieldDO> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("table_id", tableId);
        queryWrapper.eq("field_name", fieldName);
        List<WorkTableFieldDO> fieldList = rawTableFieldDao.selectList(queryWrapper);
        if (CollectionUtils.isEmpty(fieldList)) {
            throw new ObjectNotExistException("表" + tableId + "中没有字段" + fieldName);
        }
        WorkTableFieldDO fieldDO = fieldList.get(0);
        fieldDO.setStatus(Field.STATUS_HIDE);
        rawTableFieldDao.updateById(fieldDO);
        assetRetrieveService.deleteRetrievalWordBySourceId(fieldDO.getId());
    }

    @Override
    public void updateFieldAliasName(String tableId, String fieldId, String fieldAliasName) {
        WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(fieldId);
        if (fieldDO == null) {
            throw new ObjectNotExistException("表字段" + fieldId + "不存在");
        }

        fieldDO.setAliasFieldName(fieldAliasName);
        rawTableFieldDao.updateById(fieldDO);

        assetRetrieveService.addOrUpdateAssetRetrieveWord(fieldId, RetrieveWordSourceType.WORKTABLE_FIELD,
                fieldDO.getTableId(), AssetRetrieveType.WORKTABLE, fieldAliasName);

        WorkTable workTable = workTableService.loadWorkTableById(tableId);
        modifyCubeTableFieldComment(workTable, fieldDO.getFieldName(), fieldAliasName);
    }

    private void modifyCubeTableFieldComment(WorkTable workTable, String fieldName, String fieldComment) {
        if (workTable == null) {
            return;
        }

        DataSourceConfig dataSourceConfig = dataSourceManager.getCubeWriterDataSource(workTable.getStorageMedium());

        if (dataSourceConfig == null) {
            return;
        }

        SqlParser sqlParser = SqlParserFactory.getSqlParser(dataSourceConfig.getSourceTypeEnum());

        DBOperator dbOperator = DBOperatorFactory.getDBOperator(dataSourceConfig);
        if (CubeConfig.isEnableDremio()) {
            return;
        }
        String commentSql = String.format("comment on column %s.%s is '%s'",
                sqlParser.normalizeTableName(workTable.getTableSchema(), workTable.getTableName()),
                sqlParser.normalizeName(fieldName), fieldComment);

        try {
            if (dbOperator.exists(workTable.getTableSchema(), workTable.getTableName())) {
                dbOperator.execute(commentSql);
            }
        } catch (Exception e) {
            log.warn("", e);
        }
    }

    @Override
    public void updateFieldType(String tableId, String fieldId, FieldTypeEnum fieldType) {
        WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(fieldId);
        if (fieldDO == null) {
            throw new ObjectNotExistException("表字段" + fieldId + "不存在");
        }
        fieldDO.setOrigFieldType(fieldDO.getOrigFieldType());
        fieldDO.setFieldTypeEnum(fieldType.getCode());
        rawTableFieldDao.updateById(fieldDO);
    }

    @Override
    public void updateFieldLogicalType(String tableId, String fieldId, FieldLogicalTypeEnum fieldLogicalTypeEnum) {
        WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(fieldId);
        if (fieldDO == null) {
            throw new ObjectNotExistException("表字段" + fieldId + "不存在");
        }
        fieldDO.addAttr("fieldLogicalType", fieldLogicalTypeEnum.getCode());
        rawTableFieldDao.updateById(fieldDO);
    }

    @Override
    public void updateFieldComment(String tableId, String fieldId, String comment) {
        WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(fieldId);
        if (fieldDO == null) {
            throw new ObjectNotExistException("表字段" + fieldId + "不存在");
        }
        fieldDO.setComment(comment);
        fieldDO.addAttr(FieldAttrs.COMMENT, comment);
        rawTableFieldDao.updateById(fieldDO);
    }

    @Override
    public void adjustFieldPos(String tableId, String fieldId, String aheadFieldId) {
        WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(fieldId);
        if (fieldDO == null) {
            throw new ObjectNotExistException("字段" + fieldId + "不存在");
        }

        // 获取前面字段的pos位置
        Integer curPos = rawTableFieldDao.getFieldPos(aheadFieldId);
        if (curPos == null) {
            curPos = 0;
        }

        // 在curPos之后的节点,pos+1
        rawTableFieldDao.plusPos(fieldDO.getTableId(), curPos);

        fieldDO.setPos(curPos + 1);
        rawTableFieldDao.updateById(fieldDO);
    }

    @Override
    public void addWorkTableFieldsByRawDescs(String tableId, List<RawFieldDesc> rawFieldDescs) {
        if (CollectionUtils.isEmpty(rawFieldDescs)) {
            return;
        }

        List<Field> fields = rawFieldDescs.stream()
                .map(fieldDesc -> convertRawDescToField(tableId, fieldDesc, null))
                .collect(Collectors.toList());

        addWorkTableFields(tableId, fields);

        clearTableFieldCache(tableId);
    }

    @Override
    public void addWorkTableFieldsByMapping(String tableId, RawTableSchema tableSchema, List<TableFieldMappingForMigrateDTO> tableFieldMappingList) {
        if (StringUtil.isBlank(tableId) || CollectionUtils.isEmpty(tableFieldMappingList)) {
            return;
        }
        if (tableSchema == null) {
            WorkTable workTable = workTableService.loadWorkTableById(tableId);
            tableSchema = dataSourceManager.getTableSchema(workTable.getSourceId(), workTable.getSrcTableName());
        }

        List<Field> oldFieldList = getAllTableFields(tableId);
        Map<String, Field> oldFieldMap = null;
        Map<String, String> deleteFieldMap = null;
        if (!CollectionUtils.isEmpty(oldFieldList)) {
            oldFieldMap = oldFieldList.stream().collect(Collectors.toMap(Field::getFieldName, Function.identity()));
            deleteFieldMap = oldFieldList.stream().collect(Collectors.toMap(Field::getFieldName, Field::getId));
        } else {
            oldFieldMap = Maps.newHashMap();
            deleteFieldMap = Maps.newHashMap();
        }

        //原始表
        List<RawFieldDesc> rawFieldDescs = tableSchema.getFields();
        Map<String, RawFieldDesc> rawFieldDescMap = rawFieldDescs.stream().collect(Collectors.toMap(RawFieldDesc::getColumnName, Function.identity()));
        List<Field> addFieldList = Lists.newArrayList();

        //映射处理
        for (TableFieldMappingForMigrateDTO tableFieldMappingDTO : tableFieldMappingList) {
            //原字段校验
            RawFieldDesc rawFieldDesc = null;
            if (StringUtil.isNotBlank(tableFieldMappingDTO.getSrcFieldName())) {
                rawFieldDesc = rawFieldDescMap.get(tableFieldMappingDTO.getSrcFieldName());
                if (rawFieldDesc == null) {
//                    throw new YuceExceptionBase(tableFieldMappingDTO.getSrcFieldName() + "原字段不存在");
                    tableFieldMappingDTO.setSrcFieldName(null);
                }
            }

            //原字段可能不存在 ; 如果是创建,则原字段不能为空
            if (StringUtil.isBlank(tableFieldMappingDTO.getSrcFieldName()) && !oldFieldMap.containsKey(tableFieldMappingDTO.getFieldName())) {
                throw new YuceExceptionBase("创建时" + tableFieldMappingDTO.getFieldName() + "对应的原始字段不存在");
            }

            //删除字段
            deleteFieldMap.remove(tableFieldMappingDTO.getFieldName());

            if (oldFieldMap.containsKey(tableFieldMappingDTO.getFieldName())) {
                //更新
                Field field = oldFieldMap.get(tableFieldMappingDTO.getFieldName());
                Field update = new TableField();
                update.setId(field.getId());
                update.setStatus(Field.STATUS_DISPLAY);
                update.setOrigFieldName(tableFieldMappingDTO.getSrcFieldName());
                //如果原字段无映射,暂时保留上次的值
                update.setOrigFieldType(rawFieldDesc == null ? null : rawFieldDesc.getFieldTypeEnum());
                update.setFieldName(tableFieldMappingDTO.getFieldName());
                //修改字段类型
                if (tableFieldMappingDTO.getFieldTypeEnum() != null) {
                    update.setFieldTypeEnum(tableFieldMappingDTO.getFieldTypeEnum());
                    update.setFieldType(tableFieldMappingDTO.getFieldTypeEnum().getType());
                }
                if (StringUtil.isNotBlank(tableFieldMappingDTO.getFieldComment())) {
                    update.setAliasFieldName(tableFieldMappingDTO.getFieldComment());
                }

                updateTableField(tableId, update, true);

                //如果SrcFieldName为null
                if (StringUtil.isBlank(tableFieldMappingDTO.getSrcFieldName())) {
                    rawTableFieldDao.updateOrigFieldNameIsNull(field.getId());
                }

            } else {
                //新增
                TableField tableField = convertRawDescToField(tableId, rawFieldDesc, tableFieldMappingDTO);
                addFieldList.add(tableField);
            }
        }
        //新增字段
        addWorkTableFields(tableId, addFieldList);

        //删除字段处理
        if (!MapUtil.isEmpty(deleteFieldMap)) {
            deleteFieldMap.values().forEach(fieldId -> {
                invalidField(tableId, fieldId);
            });
        }

        clearTableFieldCache(tableId);
    }

    @Override
    public void addWorkTableFields(String tableId, List<Field> workTableFields) {
        if (CollectionUtils.isEmpty(workTableFields)) {
            return;
        }

        List<Field> origFields = getAllTableFields(tableId);
        Map<String, Field> origFieldNameMap = Maps.newLinkedHashMap();
        origFields.forEach(field -> {
            if (field.getFieldName() == null) {
                return;
            }

            origFieldNameMap.put(field.getFieldName(), field);
        });

        int pos = origFields.size();
        for (Field fieldDesc : workTableFields) {
            WorkTableFieldDO fieldDO = convertToTableFieldDO(tableId, fieldDesc);
            try {
                Field origField = origFieldNameMap.get(fieldDesc.getFieldName());
                if (origField != null) {
                    WorkTableFieldDO origFieldDO = convertToTableFieldDO(tableId, origField);
                    origFieldDO.setFieldType(fieldDO.getFieldType());
                    origFieldDO.setFieldTypeEnum(fieldDO.getFieldTypeEnum());
                    origFieldDO.setOrigFieldType(fieldDO.getOrigFieldType() == null ? fieldDO.getFieldTypeEnum() : fieldDO.getOrigFieldType());
                    rawTableFieldDao.updateById(origFieldDO);
                } else {

                    if (fieldDesc.getPos() == null) {
                        fieldDO.setPos(pos);
                    }
                    rawTableFieldDao.insert(fieldDO);
                }
                pos++;
            } catch (Exception e) {
                log.warn("exception catched when create table field, fieldDO: {}", JsonUtil.objectToJson(fieldDO), e);
            }
        }
        // 清除字段缓存
        clearTableFieldCache(tableId);

    }

    @Override
    public void updateWorkTableFields(String tableId, List<Field> workTableFields) {
        if (CollectionUtils.isEmpty(workTableFields)) {
            return;
        }

        List<Field> origFields = getAllTableFields(tableId);
        Map<String, Field> origFieldMaps = Maps.newHashMap();
        for (Field origField : origFields) {
            origFieldMaps.put(origField.getFieldName(), origField);
        }

        int pos = origFields.size();
        for (Field workTableField : workTableFields) {
            if (origFieldMaps.containsKey(workTableField.getFieldName())) {
                Field field = origFieldMaps.get(workTableField.getFieldName());
                WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(field.getId());
                fieldDO.setAliasFieldName(workTableField.getAliasFieldName());
                if (workTableField instanceof TableField) {
                    fieldDO.setComment(((TableField) workTableField).getComment());
                }
                fieldDO.setSourceFieldId(workTableField.getSourceFieldId());
                if (workTableField.getFieldTypeEnum() != null) {
                    fieldDO.setFieldTypeEnum(workTableField.getFieldTypeEnum().getCode());
                    fieldDO.setFieldType(workTableField.getFieldTypeEnum().getType());
                }
                if (workTableField.getOrigFieldType() != null) {
                    fieldDO.setOrigFieldType(workTableField.getOrigFieldType().getCode());
                }
                rawTableFieldDao.updateById(fieldDO);

            } else {
                WorkTableFieldDO fieldDO = convertToTableFieldDO(tableId, workTableField);
                fieldDO.setPos(++pos);
                rawTableFieldDao.insert(fieldDO);
                assetRetrieveService.addOrUpdateAssetRetrieveWord(fieldDO.getId(), RetrieveWordSourceType.WORKTABLE_FIELD,
                        tableId, AssetRetrieveType.WORKTABLE, fieldDO.getAliasFieldName());
            }
        }


    }

    @Override
    public void updateTableField(String tableId, Field workTableField) {
        updateTableField(tableId, workTableField, false);
    }

    public void updateTableField(String tableId, Field workTableField, boolean ignoreUpdateStarrocks) {
        Field oldField = loadFieldById(workTableField.getId());
        WorkTableFieldDO fieldDO = convertToTableFieldDO(tableId, workTableField);
        rawTableFieldDao.updateById(fieldDO);
        WorkTable workTable = workTableService.loadWorkTableById(tableId);
        if (!ignoreUpdateStarrocks && !oldField.getFieldType().equals(workTableField.getFieldType())) {
            //更新字段
            log.info("start to update workTableField {} oldField {}", workTableField, oldField);
            TableOperator.getTableOperator().updateField(workTable.getStorageMedium(), workTable.getTableSchema(), workTable.getTableName(), getRawFieldDesc(workTableField));

            TableOperator.refreshMeta(workTable.getStorageMedium(), workTable.getTableName());
        } else {
            log.info("field {} not need update tableId {}", JSONObject.toJSONString(workTableField), JSONObject.toJSONString(oldField));
        }

        cacheManager.getCache(CacheConstants.CACHE_VALUES_WORK_TABLE_FIELDS).evictIfPresent(tableId);
        cacheManager.getCache(CacheConstants.CACHE_VALUES_TABLE_FIELD).evictIfPresent(workTableField.getId());
    }

    @Override
    public void updateTableField(String tableId, Field workTableField, Map<String, RawFieldDesc> rawFieldDescMap) {
        Field oldField = loadFieldById(workTableField.getId());
        WorkTableFieldDO fieldDO = convertToTableFieldDO(tableId, workTableField);
        rawTableFieldDao.updateById(fieldDO);
        WorkTable workTable = workTableService.loadWorkTableById(tableId);
        RawFieldDesc rawFieldDesc = rawFieldDescMap.get(oldField.getFieldName());
        if(rawFieldDesc != null) {
            String workTableFieldType = workTableField.getFieldType().toLowerCase();
            String rawFieldDescFieldType = rawFieldDesc.getDataType().toLowerCase();
            if (!workTableFieldType.equals(rawFieldDescFieldType)) {
                //更新字段
                log.info("字段更新,新旧字段类型不一致 {}  {}", JSONObject.toJSONString(workTableField), JSONObject.toJSONString(rawFieldDesc));
                TableOperator.getTableOperator().updateField(workTable.getStorageMedium(), workTable.getTableSchema(), workTable.getTableName(), getRawFieldDesc(workTableField));
                TableOperator.refreshMeta(workTable.getStorageMedium(), workTable.getTableName());
            } else {
                log.info("字段未更新,新旧字段类型一致 {}  {}", JSONObject.toJSONString(workTableField), JSONObject.toJSONString(rawFieldDesc));
            }

        }
        cacheManager.getCache(CacheConstants.CACHE_VALUES_WORK_TABLE_FIELDS).evictIfPresent(tableId);
        cacheManager.getCache(CacheConstants.CACHE_VALUES_TABLE_FIELD).evictIfPresent(workTableField.getId());
    }

    private RawFieldDesc getRawFieldDesc(Field tableField) {
        UserDefinedTableSchema tableSchema = new UserDefinedTableSchema();
        UserDefinedTableSchema.Column column = new UserDefinedTableSchema.Column(0);
        column.setFieldName(tableField.getFieldName());
        column.setAliasFieldName(tableField.getAliasFieldName());
        column.setFieldTypeEnum(tableField.getFieldTypeEnum());
        column.setFieldType(tableField.getFieldType());
        tableSchema.addColumn(column);
        return tableSchema.toRawTableSchema().getFields().get(0);
    }

    @Override
    public void updateWorkTableFieldsByRawDescs(String tableId, List<RawFieldDesc> rawFieldDescs) {
        if (CollectionUtils.isEmpty(rawFieldDescs)) {
            return;
        }

        List<Field> fields = rawFieldDescs.stream()
                .map(fieldDesc -> convertRawDescToField(tableId, fieldDesc, null))
                .collect(Collectors.toList());

        updateWorkTableFields(tableId, fields);
    }

    protected TableField convertToFieldVO(WorkTableFieldDO fieldDO) {
        if (fieldDO == null) {
            ;
            return null;
        }

        TableField fieldVO = com.yuce.base.util.BeanUtils.copyObject(fieldDO, TableField.class, (src, dest) -> {
            dest.setStatus(src.getStatus());
            dest.setObjId(src.getTableId());
            dest.setId(src.getId());
            if (src.getFieldTypeEnum() != null) {
                dest.setFieldTypeEnum(FieldTypeEnum.fromCode(src.getFieldTypeEnum()));
            }
            if (src.getOrigFieldType() != null) {
                dest.setOrigFieldType(FieldTypeEnum.fromCode(src.getOrigFieldType()));
            }

            if (src.getAttrs() != null) {
                dest.setAttrs(JsonUtil.jsonToObject(src.getAttrs(), Map.class));
            }
            if (src.getPartitionKey() != null) {
                dest.setPartitionKey(src.getPartitionKey() == 1);
            }
        });

        fieldVO.setObjId(fieldDO.getTableId());
        return fieldVO;
    }


    protected WorkTableFieldDO convertToTableFieldDO(String tableId, Field field) {
        WorkTableFieldDO fieldDO = com.yuce.base.util.BeanUtils.copyObject(field, WorkTableFieldDO.class, (src, dest) -> {
            dest.setStatus(src.getStatus());
            dest.setTableId(tableId);
            if (src.getId() == null) {
                dest.setId(UUIDUtil.getID());
            }
            if (src.getFieldTypeEnum() != null) {
                dest.setFieldTypeEnum(src.getFieldTypeEnum().getCode());
            }
            if (src.getOrigFieldType() != null) {
                dest.setOrigFieldType(src.getOrigFieldType().getCode());
            }
            if (!CollectionUtils.isEmpty(src.getAttrs())) {
                dest.setAttrs(JsonUtil.objectToJson(src.getAttrs()));
            }
            if (src.getPartitionKey() != null) {
                dest.setPartitionKey(src.getPartitionKey() ? 1 : 0);
            }
        });

        return fieldDO;
    }

    protected TableField convertRawDescToField(String tableId, RawFieldDesc fieldDesc, TableFieldMappingForMigrateDTO tableFieldMappingDTO) {
        TableField tableField = new TableField();
        tableField.setObjId(tableId);

        // 增加判断 如果有括号就直接剔除掉括号
        String fieldName = com.yuce.base.util.StringUtil.filterString(fieldDesc.getColumnName());
        tableField.setFieldName(fieldName);
        //映射字段处理
        if (tableFieldMappingDTO != null && StringUtil.isNotBlank(tableFieldMappingDTO.getFieldName())) {
            tableField.setFieldName(tableFieldMappingDTO.getFieldName());
        }

        //映射字段处理:处理原则:如果不存在则用原逻辑
        //维护原始字段
        tableField.setOrigFieldName(tableFieldMappingDTO == null ? fieldDesc.getColumnName() : tableFieldMappingDTO.getSrcFieldName());


        if (!StringUtils.isEmpty(fieldDesc.getColumnComment())) {
            tableField.setAliasFieldName(fieldDesc.getColumnComment());
        } else {
            tableField.setAliasFieldName(fieldDesc.getColumnName());
        }

        tableField.setAttr("indexType", fieldDesc.getColumnType());
        if (fieldDesc.isUniKey()) {
            tableField.setAttr("uniqRate", 1);
        }
        tableField.setComment(fieldDesc.getColumnComment());

        //映射字段处理
        if (tableFieldMappingDTO != null && StringUtil.isNotBlank(tableFieldMappingDTO.getFieldComment())) {
            tableField.setAliasFieldName(tableFieldMappingDTO.getFieldComment());
        }

        tableField.setPos(fieldDesc.getOrdinalPosition());
        tableField.setFieldType(fieldDesc.getColumnType());
        tableField.setSourceFieldId(fieldDesc.getSrcFieldId());

        FieldTypeEnum fieldTypeEnum = fieldDesc.getFieldTypeEnum();
        if (fieldTypeEnum == null) {
            fieldTypeEnum = FieldTypeEnum.guessEnumByType(fieldDesc.getColumnType());
        }
        tableField.setOrigFieldType(fieldTypeEnum);

        //映射字段处理
        if (tableFieldMappingDTO != null && tableFieldMappingDTO.getFieldTypeEnum() != null) {
            fieldTypeEnum = tableFieldMappingDTO.getFieldTypeEnum();
            tableField.setFieldType(fieldTypeEnum.getType());
        }

        tableField.setFieldTypeEnum(fieldTypeEnum);
        tableField.setId(UUIDUtil.getID());
        FieldLogicalTypeEnum fieldLogicalTypeEnum;
        switch (fieldTypeEnum) {
            case DATETIME:
                fieldLogicalTypeEnum = FieldLogicalTypeEnum.DATETIME;
                break;
            case NUMBER:
                fieldLogicalTypeEnum = FieldLogicalTypeEnum.ALL_NUMBER;
                break;
            default:
                fieldLogicalTypeEnum = FieldLogicalTypeEnum.ALL_STRING;
        }

        tableField.setAttr("fieldLogicalType", fieldLogicalTypeEnum.getCode());


        return tableField;
    }

    @Override
    public List<Field> loadFieldByIds(Set<String> fieldIds) {
        List<Field> fieldVOS = Lists.newArrayList();

        QueryWrapper queryWrapper = new QueryWrapper();
        queryWrapper.in("id", fieldIds);

        List<WorkTableFieldDO> fieldDOS = rawTableFieldDao.selectList(queryWrapper);
        for (WorkTableFieldDO fieldDO : fieldDOS) {
            TableField field = convertToFieldVO(fieldDO);
            fieldVOS.add(field);
        }
        return fieldVOS;
    }

    @Override
    public void clearTableFieldCache(String tableId) {
        cacheManager.getCache(CacheConstants.CACHE_VALUES_WORK_TABLE_FIELDS).evictIfPresent(tableId);
        List<Field> fields = getAllTableFields(tableId);
        if (CollectionUtils.isEmpty(fields)) {
            return;
        }
        fields.forEach(field -> {
            cacheManager.getCache(CacheConstants.CACHE_VALUES_TABLE_FIELD).evictIfPresent(field.getId());
        });
    }

    @Override
    public TableField loadFieldByFieldName(String tableId, String fieldName) {
        QueryWrapper<WorkTableFieldDO> objectQueryWrapper = new QueryWrapper<>();
        objectQueryWrapper.eq("table_id", tableId);
        objectQueryWrapper.eq("field_name", fieldName);
        List<WorkTableFieldDO> workTableFieldDOList = rawTableFieldDao.selectList(objectQueryWrapper);
        if (!CollectionUtils.isEmpty(workTableFieldDOList)) {
            return convertToFieldVO(workTableFieldDOList.get(0));
        }
        return null;
    }

    @Override
    public List<TableField> loadFieldByFieldNames(String tableId, List<String> fieldNames) {
        if (CollectionUtils.isEmpty(fieldNames)) {
            return Lists.newArrayList();
        }
        QueryWrapper<WorkTableFieldDO> objectQueryWrapper = new QueryWrapper<>();
        objectQueryWrapper.eq("table_id", tableId);
        objectQueryWrapper.in("field_name", fieldNames);
        List<WorkTableFieldDO> workTableFieldDOList = rawTableFieldDao.selectList(objectQueryWrapper);
        if (CollectionUtils.isEmpty(workTableFieldDOList)) {
            return Lists.newArrayList();
        }
        return workTableFieldDOList.stream().map(x -> convertToFieldVO(x)).collect(Collectors.toList());
    }

    @Override
    public void addColumns(String tableId, Field workTableField) {
        List<Field> origFields = getAllTableFields(tableId);
        Set<String> origFieldNameSet = origFields.stream().map(Field::getFieldName).collect(Collectors.toSet());

        if (origFieldNameSet.contains(workTableField.getFieldName())) {
            throw new IllegalArgumentException("字段名已存在!");
        }

        WorkTable workTable = workTableService.loadWorkTableById(tableId);
        DataSourceConfig dataSourceConfig = dataSourceManager.getCubeWriterDataSource(workTable.getStorageMedium());

        if (dataSourceConfig == null) {
            return;
        }

        DBOperator dbOperator = DBOperatorFactory.getDBOperator(dataSourceConfig);
        if (!dbOperator.exists(workTable.getTableSchema(), workTable.getTableName())) {
            return;
        }

        //添加字段到表中
        syncFieldToDb(workTable, workTableField.getFieldName(), workTableField.getFieldType(), workTableField.getFieldTypeEnum(), workTableField.getAliasFieldName());

        int pos = origFields.size();
        WorkTableFieldDO fieldDO = convertToTableFieldDO(tableId, workTableField);
        fieldDO.setPos(++pos);
        rawTableFieldDao.insert(fieldDO);
        //添加字段到表中
        syncFieldToDb(workTable, workTableField.getFieldName(), workTableField.getFieldType(), workTableField.getFieldTypeEnum(), workTableField.getAliasFieldName());

        // 刷新schema
        TableOperator.refreshMeta(workTable.getStorageMedium(), workTable.getTableName());

        //  添加字段到资产表直接创建的卡片数据集
        addCollectionFieldFromAssertTableField(tableId, workTableField);

        applicationContext.publishEvent(new ResourceObjectEvent(tableId, ThreadLocalUtils.getUserId(), CubeCategoryEnum.DATA_OBJECT, OperatorEnum.UPDATE));

    }

    /**
     * 根据资产目录表
     *
     * @param tableId
     * @param workTableField
     */
    public void addCollectionFieldFromAssertTableField(String tableId, Field workTableField) {
        List<DataCollection> collectionList = dataCollectionService.getTableRelatedCollections(tableId);
        if (CollectionUtils.isEmpty(collectionList)) {
            return;
        }

        for (DataCollection collection : collectionList) {
            if (CollectionSourceEnum.ASSET_TABLE.equals(collection.getCollectionSourceEnum())) {
                String collectionId = collection.getId();
                CollectionField collectionField = com.yuce.base.util.BeanUtils.copyObject(workTableField, CollectionField.class);
                collectionField.setId(UUIDUtil.getID());
                collectionField.setCollectionId(collectionId);
                collectionField.setStatus(CollectionField.STATUS_DISPLAY);
                collectionFieldService.addCollectionField(collectionId, collectionField, true);
            }
        }
    }

    @Override
    public void deleteFieldByFieldName(String tableId, String fieldName) {
        if (StringUtil.isBlank(tableId) || StringUtil.isBlank(fieldName)) {
            return;
        }
        LambdaQueryWrapper<WorkTableFieldDO> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(WorkTableFieldDO::getTableId, tableId);
        queryWrapper.eq(WorkTableFieldDO::getFieldName, fieldName);
        rawTableFieldDao.delete(queryWrapper);
    }

    @Override
    public void updateStatusByTableId(String tableId, Integer status) {
        if (StringUtil.isBlank(tableId) || status == null) {
            return;
        }
        if (0 != status && 1 != status) {
            return;
        }
        WorkTableFieldDO workTableFieldDO = new WorkTableFieldDO();
        workTableFieldDO.setStatus(status);

        LambdaQueryWrapper<WorkTableFieldDO> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(WorkTableFieldDO::getTableId, tableId);
        rawTableFieldDao.update(workTableFieldDO, queryWrapper);
    }

    @Override
    public void updateFieldStatus(String tableId, String fieldId, Integer status) {
        WorkTableFieldDO fieldDO = rawTableFieldDao.selectById(fieldId);
        if (fieldDO == null) {
            throw new ObjectNotExistException("表字段" + fieldId + "不存在");
        }
        WorkTableFieldDO update = new WorkTableFieldDO();
        update.setId(fieldId);
        update.setStatus(status);
        rawTableFieldDao.updateById(update);
    }

    @Override
    public void updateFieldIdAndTableName(String tableId, String fieldId, String newFieldId, String fieldName) {
        if (StringUtil.isBlank(tableId) || StringUtil.isBlank(fieldId) || StringUtil.isBlank(newFieldId) || StringUtil.isBlank(fieldName)) {
            return;
        }
        rawTableFieldDao.updateFieldIdAndTableName(fieldId, newFieldId, fieldName);
    }

    @Override
    public List<TableFieldMappingForMigrateDTO> listTableFieldMappingForMigrateDTO(String tableId) {
        if (StringUtil.isBlank(tableId)) {
            return Lists.newArrayList();
        }
        List<Field> fields = getAllTableFields(tableId);
        if (CollectionUtils.isEmpty(fields)) {
            return Lists.newArrayList();
        }
        List<TableFieldMappingForMigrateDTO> retList = Lists.newArrayList();
        for (Field field : fields) {
            TableFieldMappingForMigrateDTO dto = TableFieldMappingForMigrateDTO.builder()
                    .srcFieldName(StringUtil.isBlank(field.getOrigFieldName()) ? field.getFieldName() : field.getOrigFieldName())
                    .fieldComment(field.getAliasFieldName())
                    .fieldTypeEnum(field.getFieldTypeEnum())
                    .fieldName(field.getFieldName())
                    .build();
            retList.add(dto);
        }
        return retList;
    }

    @Override
    public Map<FieldTypeEnum, List<StarrocksFieldTypeEnum>> getStarRocksStoreType() {
        Map<FieldTypeEnum, List<StarrocksFieldTypeEnum>> res = new HashMap<>();
        FieldTypeEnum[] values = FieldTypeEnum.values();
        for (FieldTypeEnum fieldTypeEnum : values) {
            List<StarrocksFieldTypeEnum> tmp = new ArrayList<>();
            switch (fieldTypeEnum) {
                case DATE:
                    tmp.add(StarrocksFieldTypeEnum.DATE);
                    break;
                case DATETIME:
                    tmp.add(StarrocksFieldTypeEnum.DATETIME);
                    break;
                case STRING:
                    tmp.add(StarrocksFieldTypeEnum.JSON);
                    tmp.add(StarrocksFieldTypeEnum.VARCHAR);
                    break;
                case NUMBER:
                    tmp.add(StarrocksFieldTypeEnum.BIGINT);
                    tmp.add(StarrocksFieldTypeEnum.DECIMAL);
                    tmp.add(StarrocksFieldTypeEnum.DOUBLE);
                    break;
                case GEOMETRY:
                    tmp.add(StarrocksFieldTypeEnum.BINARY);
                    break;
            }
            res.put(fieldTypeEnum, tmp);
        }
        return res;
    }

    private void syncFieldToDb(WorkTable workTable, String fieldName, String fieldType, FieldTypeEnum fieldTypeEnum, String fieldComment) {
        RawFieldDesc fieldDesc = getRawFieldDesc(fieldName, fieldTypeEnum, fieldComment, fieldType);
        if(workTable.getStorageMedium() == TableStorageMediumEnum.DATA_LAKE) {
            DBOperator dbOperator = CubeSourceFactory.getDremioDBOperator(-1);
            dbOperator.addField(CubeSourceFactory.getDremioTableSchemaName(workTable.getStorageMedium()), workTable.getTableName(), fieldDesc);
        } else {
            DBOperator dbOperator = CubeSourceFactory.getStarrocksDBOperator(workTable.getStorageMedium(), -1);
            dbOperator.addField(workTable.getTableSchema(), workTable.getTableName(), getRawFieldDesc(fieldName, fieldTypeEnum, fieldComment, fieldType));
        }

        TableOperator.refreshMeta(workTable.getStorageMedium(), workTable.getTableName());

    }



    @Override
    public void copyTableFiled(String tableId, String sourceTableId) {
        LambdaQueryWrapper<WorkTableFieldDO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
        lambdaQueryWrapper.eq(WorkTableFieldDO::getTableId, tableId);
        List<WorkTableFieldDO> workTableFieldDOS = rawTableFieldDao.selectList(lambdaQueryWrapper);
        for (WorkTableFieldDO workTableFieldDO : workTableFieldDOS) {
            WorkTableFieldDO fieldDO = new WorkTableFieldDO();
            BeanUtils.copyProperties(workTableFieldDO, fieldDO);
            fieldDO.setId(UUIDUtil.getID());
            fieldDO.setTableId(sourceTableId);
            rawTableFieldDao.insert(fieldDO);
        }
    }


    private RawFieldDesc getRawFieldDesc(String fieldName, FieldTypeEnum fieldTypeEnum, String fieldComment, String fieldType) {
        UserDefinedTableSchema tableSchema = new UserDefinedTableSchema();
        UserDefinedTableSchema.Column column = new UserDefinedTableSchema.Column(0);
        column.setFieldName(fieldName);
        column.setAliasFieldName(fieldComment);
        column.setFieldTypeEnum(fieldTypeEnum);
        column.setFieldType(fieldType);
        tableSchema.addColumn(column);
        return tableSchema.toRawTableSchema().getFields().get(0);
    }
}