diff --git a/internal/dms/pkg/constant/const.go b/internal/dms/pkg/constant/const.go index 187416003..86362553a 100644 --- a/internal/dms/pkg/constant/const.go +++ b/internal/dms/pkg/constant/const.go @@ -256,6 +256,8 @@ func ParseDBType(s string) (DBType, error) { return DBTypeHANA, nil case "PolarDB For MySQL": return DBTypePolarDBForMySQL, nil + case "MongoDB": + return DBTypeMongoDB, nil case "OceanBase For Oracle": return DBTypeOceanBaseOracle, nil @@ -281,6 +283,7 @@ const ( DBTypeGaussDBForMySQL DBType = "GaussDB for MySQL" DBTypeHANA DBType = "HANA" DBTypePolarDBForMySQL DBType = "PolarDB For MySQL" + DBTypeMongoDB DBType = "MongoDB" DBTypeOceanBaseOracle DBType = "OceanBase For Oracle" ) diff --git a/internal/dms/pkg/constant/const_test.go b/internal/dms/pkg/constant/const_test.go index 71c14a9bd..eb1610757 100644 --- a/internal/dms/pkg/constant/const_test.go +++ b/internal/dms/pkg/constant/const_test.go @@ -7,10 +7,10 @@ import ( func TestCheckDBTypeIfDataExportSupported_NewTypes(t *testing.T) { // 验证新增的数据源应在白名单中 newTypes := map[string]bool{ - "TiDB": true, - "TDSQL For InnoDB": true, - "GoldenDB": true, - "TBase": true, + "TiDB": true, + "TDSQL For InnoDB": true, + "GoldenDB": true, + "TBase": true, // Issue #2868: GaussDB (PostgreSQL 协议) 与 GaussDB for MySQL (MySQL 协议) 是两个独立产品 "GaussDB": true, // PostgreSQL 协议 GaussDB / openGauss, 走 opengauss-connector-go-pq 驱动 "GaussDB for MySQL": true, // 华为云 GaussDB(for MySQL), 走 MySQL 驱动 @@ -51,9 +51,9 @@ func TestCheckDBTypeIfDataExportSupported_ExistingTypes(t *testing.T) { func TestParseDBType_PolarDB(t *testing.T) { cases := map[string]struct { - input string - wantDBType DBType - wantErr bool + input string + wantDBType DBType + wantErr bool }{ "valid PolarDB For MySQL": { input: "PolarDB For MySQL", @@ -109,28 +109,29 @@ func TestParseDBType(t *testing.T) { expected DBType expectError bool }{ - "MySQL": {input: "MySQL", expected: DBTypeMySQL}, - "TDSQL For InnoDB": {input: "TDSQL For InnoDB", expected: DBTypeTDSQLForInnoDB}, - "TiDB": {input: "TiDB", expected: DBTypeTiDB}, - "PostgreSQL": {input: "PostgreSQL", expected: DBTypePostgreSQL}, - "Oracle": {input: "Oracle", expected: DBTypeOracle}, - "DB2": {input: "DB2", expected: DBTypeDB2}, - "SQL Server": {input: "SQL Server", expected: DBTypeSQLServer}, + "MySQL": {input: "MySQL", expected: DBTypeMySQL}, + "TDSQL For InnoDB": {input: "TDSQL For InnoDB", expected: DBTypeTDSQLForInnoDB}, + "TiDB": {input: "TiDB", expected: DBTypeTiDB}, + "PostgreSQL": {input: "PostgreSQL", expected: DBTypePostgreSQL}, + "Oracle": {input: "Oracle", expected: DBTypeOracle}, + "DB2": {input: "DB2", expected: DBTypeDB2}, + "SQL Server": {input: "SQL Server", expected: DBTypeSQLServer}, "OceanBase For MySQL": {input: "OceanBase For MySQL", expected: DBTypeOceanBaseMySQL}, - "GoldenDB": {input: "GoldenDB", expected: DBTypeGoldenDB}, - "TBase": {input: "TBase", expected: DBTypeTBase}, - "Hive": {input: "Hive", expected: DBTypeHive}, - "DM": {input: "DM", expected: DBTypeDM}, + "GoldenDB": {input: "GoldenDB", expected: DBTypeGoldenDB}, + "TBase": {input: "TBase", expected: DBTypeTBase}, + "Hive": {input: "Hive", expected: DBTypeHive}, + "DM": {input: "DM", expected: DBTypeDM}, // Issue #2868: 拆分 GaussDB / GaussDB for MySQL 为两个独立产品 - "GaussDB": {input: "GaussDB", expected: DBTypeGaussDB}, - "GaussDB for MySQL": {input: "GaussDB for MySQL", expected: DBTypeGaussDBForMySQL}, - "HANA": {input: "HANA", expected: DBTypeHANA}, + "GaussDB": {input: "GaussDB", expected: DBTypeGaussDB}, + "GaussDB for MySQL": {input: "GaussDB for MySQL", expected: DBTypeGaussDBForMySQL}, + "HANA": {input: "HANA", expected: DBTypeHANA}, // PolarDB-MySQL 新增 (Issue #826) - "PolarDB For MySQL": {input: "PolarDB For MySQL", expected: DBTypePolarDBForMySQL}, + "PolarDB For MySQL": {input: "PolarDB For MySQL", expected: DBTypePolarDBForMySQL}, + "MongoDB": {input: "MongoDB", expected: DBTypeMongoDB}, // "PolarDB" 单独不应匹配 - "PolarDB only": {input: "PolarDB", expectError: true}, - "invalid type": {input: "UnknownDB", expectError: true}, - "empty string": {input: "", expectError: true}, + "PolarDB only": {input: "PolarDB", expectError: true}, + "invalid type": {input: "UnknownDB", expectError: true}, + "empty string": {input: "", expectError: true}, } for name, tc := range tests { diff --git a/internal/sql_workbench/client/sql_workbench_client.go b/internal/sql_workbench/client/sql_workbench_client.go index 082e6b8be..cd46109c7 100644 --- a/internal/sql_workbench/client/sql_workbench_client.go +++ b/internal/sql_workbench/client/sql_workbench_client.go @@ -398,7 +398,7 @@ func (c *SqlWorkbenchClient) CreateDatasources(datasource CreateDatasourceReques // 检查HTTP状态码 if resp.StatusCode != http.StatusOK { c.log.Errorf("Create datasource failed with status code: %d, response: %s", resp.StatusCode, string(body)) - return nil, fmt.Errorf("create datasource failed with status code: %d", resp.StatusCode) + return nil, fmt.Errorf("create datasource failed with status code: %d, response: %s", resp.StatusCode, string(body)) } // 解析响应 diff --git a/internal/sql_workbench/service/sql_workbench_service.go b/internal/sql_workbench/service/sql_workbench_service.go index 300ead120..bd8da2521 100644 --- a/internal/sql_workbench/service/sql_workbench_service.go +++ b/internal/sql_workbench/service/sql_workbench_service.go @@ -851,8 +851,20 @@ type datasourceBaseInfo struct { ServiceName *string EnvironmentID int64 DefaultSchema *string + Properties interface{} + JDBCParams map[string]interface{} } +const ( + mongoDefaultDatabaseParam = "default_database" + mongoAuthDatabaseParam = "auth_source" + mongoAuthMechanismParam = "auth_mechanism" + mongoReplicaSetParam = "replica_set" + mongoTLSEnabledParam = "tls" + mongoDirectConnectionParam = "direct_connection" + mongoTLSSkipVerifyParam = "tls_skip_verify" +) + // buildDatasourceBaseInfo 构建数据源基础信息 func (sqlWorkbenchService *SqlWorkbenchService) buildDatasourceBaseInfo(ctx context.Context, dbService *biz.DBService, environmentID int64) (*datasourceBaseInfo, error) { datasourceName, err := sqlWorkbenchService.buildDatasourceName(ctx, dbService) @@ -876,6 +888,10 @@ func (sqlWorkbenchService *SqlWorkbenchService) buildDatasourceBaseInfo(ctx cont baseInfo.ServiceName = &serviceName } + if dbService.DBType == string(pkgConst.DBTypeMongoDB) { + baseInfo.DefaultSchema, baseInfo.Properties, baseInfo.JDBCParams = buildMongoDatasourceOptions(dbService) + } + return baseInfo, nil } @@ -887,17 +903,19 @@ func (sqlWorkbenchService *SqlWorkbenchService) buildCreateDatasourceRequest(ctx } return client.CreateDatasourceRequest{ - CreatorID: sqlWorkbenchUser.SqlWorkbenchUserId, - Type: baseInfo.Type, - Name: baseInfo.Name, - Username: baseInfo.Username, - Password: baseInfo.Password, - Host: baseInfo.Host, - Port: baseInfo.Port, - ServiceName: baseInfo.ServiceName, - SSLConfig: client.SSLConfig{Enabled: false}, - EnvironmentID: baseInfo.EnvironmentID, - DefaultSchema: baseInfo.DefaultSchema, + CreatorID: sqlWorkbenchUser.SqlWorkbenchUserId, + Type: baseInfo.Type, + Name: baseInfo.Name, + Username: baseInfo.Username, + Password: baseInfo.Password, + Host: baseInfo.Host, + Port: baseInfo.Port, + ServiceName: baseInfo.ServiceName, + SSLConfig: client.SSLConfig{Enabled: false}, + Properties: baseInfo.Properties, + EnvironmentID: baseInfo.EnvironmentID, + JdbcURLParameters: baseInfo.JDBCParams, + DefaultSchema: baseInfo.DefaultSchema, }, nil } @@ -909,16 +927,18 @@ func (sqlWorkbenchService *SqlWorkbenchService) buildUpdateDatasourceRequest(ctx } return client.UpdateDatasourceRequest{ - Type: baseInfo.Type, - Name: &baseInfo.Name, - Username: baseInfo.Username, - Password: &baseInfo.Password, - Host: baseInfo.Host, - Port: baseInfo.Port, - ServiceName: baseInfo.ServiceName, - SSLConfig: client.SSLConfig{Enabled: false}, - EnvironmentID: baseInfo.EnvironmentID, - DefaultSchema: baseInfo.DefaultSchema, + Type: baseInfo.Type, + Name: &baseInfo.Name, + Username: baseInfo.Username, + Password: &baseInfo.Password, + Host: baseInfo.Host, + Port: baseInfo.Port, + ServiceName: baseInfo.ServiceName, + SSLConfig: client.SSLConfig{Enabled: false}, + Properties: interfacePtr(baseInfo.Properties), + EnvironmentID: baseInfo.EnvironmentID, + JdbcURLParameters: mapPtr(baseInfo.JDBCParams), + DefaultSchema: baseInfo.DefaultSchema, }, nil } @@ -950,6 +970,8 @@ func (sqlWorkbenchService *SqlWorkbenchService) convertDBType(dmsDBType string) return "MYSQL" case "PolarDB For MySQL": return "MYSQL" + case "MongoDB": + return "MONGODB" default: return dmsDBType } @@ -966,6 +988,57 @@ func (sqlWorkbenchService *SqlWorkbenchService) SupportDBType(dbType pkgConst.DB dbType == pkgConst.DBTypePolarDBForMySQL } +func buildMongoDatasourceOptions(dbService *biz.DBService) (*string, interface{}, map[string]interface{}) { + defaultDatabase := dbService.AdditionalParams.GetParam(mongoDefaultDatabaseParam).String() + var defaultSchema *string + if defaultDatabase != "" { + defaultSchema = &defaultDatabase + } + + jdbcParams := map[string]interface{}{} + if authDB := dbService.AdditionalParams.GetParam(mongoAuthDatabaseParam).String(); authDB != "" { + jdbcParams["authSource"] = authDB + } + if authMechanism := dbService.AdditionalParams.GetParam(mongoAuthMechanismParam).String(); authMechanism != "" { + jdbcParams["authMechanism"] = authMechanism + } + if replicaSet := dbService.AdditionalParams.GetParam(mongoReplicaSetParam).String(); replicaSet != "" { + jdbcParams["replicaSet"] = replicaSet + } + if tlsParam := dbService.AdditionalParams.GetParam(mongoTLSEnabledParam); tlsParam != nil && tlsParam.String() != "" { + if tlsParam.Bool() { + jdbcParams["tls"] = "true" + } else { + jdbcParams["tls"] = "false" + } + } + if dbService.AdditionalParams.GetParam(mongoDirectConnectionParam).Bool() { + jdbcParams["directConnection"] = true + } + if dbService.AdditionalParams.GetParam(mongoTLSSkipVerifyParam).Bool() { + jdbcParams["tlsInsecure"] = true + } + + if len(jdbcParams) == 0 { + return defaultSchema, nil, nil + } + return defaultSchema, nil, jdbcParams +} + +func interfacePtr(v interface{}) *interface{} { + if v == nil { + return nil + } + return &v +} + +func mapPtr(v map[string]interface{}) *map[string]interface{} { + if len(v) == 0 { + return nil + } + return &v +} + // buildDatabaseUser 当是ob-mysql时需要给账号管理的账号附加租户名集群名等字符: root@oms_mysql#oms_resource_4250 func buildDatabaseUser(account string, dbServiceUser string, dbType string) string { if dbType == string(pkgConst.DBTypeOceanBaseMySQL) { diff --git a/internal/sql_workbench/service/sql_workbench_service_test.go b/internal/sql_workbench/service/sql_workbench_service_test.go index 6c65ba0a0..924b35946 100644 --- a/internal/sql_workbench/service/sql_workbench_service_test.go +++ b/internal/sql_workbench/service/sql_workbench_service_test.go @@ -3,7 +3,9 @@ package sql_workbench import ( "testing" + "github.com/actiontech/dms/internal/dms/biz" pkgConst "github.com/actiontech/dms/internal/dms/pkg/constant" + pkgParams "github.com/actiontech/dms/pkg/params" ) func Test_convertDBType(t *testing.T) { @@ -12,17 +14,18 @@ func Test_convertDBType(t *testing.T) { input string expected string }{ - "DM": {input: "达梦(DM)", expected: "DM"}, - "MySQL": {input: "MySQL", expected: "MYSQL"}, - "PostgreSQL": {input: "PostgreSQL", expected: "POSTGRESQL"}, - "Oracle": {input: "Oracle", expected: "ORACLE"}, - "SQL Server": {input: "SQL Server", expected: "SQL_SERVER"}, - "OB Oracle": {input: "OceanBase For Oracle", expected: "OB_ORACLE"}, - "OB MySQL": {input: "OceanBase For MySQL", expected: "OB_MYSQL"}, + "DM": {input: "达梦(DM)", expected: "DM"}, + "MySQL": {input: "MySQL", expected: "MYSQL"}, + "PostgreSQL": {input: "PostgreSQL", expected: "POSTGRESQL"}, + "Oracle": {input: "Oracle", expected: "ORACLE"}, + "SQL Server": {input: "SQL Server", expected: "SQL_SERVER"}, + "OB Oracle": {input: "OceanBase For Oracle", expected: "OB_ORACLE"}, + "OB MySQL": {input: "OceanBase For MySQL", expected: "OB_MYSQL"}, "TiDB": {input: "TiDB", expected: "TIDB"}, - "TDSQL For InnoDB": {input: "TDSQL For InnoDB", expected: "MYSQL"}, + "TDSQL For InnoDB": {input: "TDSQL For InnoDB", expected: "MYSQL"}, "GoldenDB": {input: "GoldenDB", expected: "MYSQL"}, "PolarDB For MySQL": {input: "PolarDB For MySQL", expected: "MYSQL"}, + "MongoDB": {input: "MongoDB", expected: "MONGODB"}, "Unknown passthrough": {input: "UnknownDB", expected: "UnknownDB"}, } for name, tc := range cases { @@ -41,16 +44,17 @@ func Test_SupportDBType(t *testing.T) { input pkgConst.DBType expected bool }{ - "DM supported": {input: pkgConst.DBTypeDM, expected: true}, - "MySQL supported": {input: pkgConst.DBTypeMySQL, expected: true}, - "Oracle supported": {input: pkgConst.DBTypeOracle, expected: true}, - "OB MySQL supported": {input: pkgConst.DBTypeOceanBaseMySQL, expected: true}, - "TiDB supported": {input: pkgConst.DBTypeTiDB, expected: true}, - "TDSQL supported": {input: pkgConst.DBTypeTDSQLForInnoDB, expected: true}, - "GoldenDB supported": {input: pkgConst.DBTypeGoldenDB, expected: true}, - "PostgreSQL unsupported": {input: pkgConst.DBTypePostgreSQL, expected: false}, - "SQL Server unsupported": {input: pkgConst.DBTypeSQLServer, expected: false}, - "PolarDB For MySQL supported": {input: pkgConst.DBTypePolarDBForMySQL, expected: true}, + "DM supported": {input: pkgConst.DBTypeDM, expected: true}, + "MySQL supported": {input: pkgConst.DBTypeMySQL, expected: true}, + "Oracle supported": {input: pkgConst.DBTypeOracle, expected: true}, + "OB MySQL supported": {input: pkgConst.DBTypeOceanBaseMySQL, expected: true}, + "TiDB supported": {input: pkgConst.DBTypeTiDB, expected: true}, + "TDSQL supported": {input: pkgConst.DBTypeTDSQLForInnoDB, expected: true}, + "GoldenDB supported": {input: pkgConst.DBTypeGoldenDB, expected: true}, + "MongoDB unsupported": {input: pkgConst.DBTypeMongoDB, expected: false}, + "PostgreSQL unsupported": {input: pkgConst.DBTypePostgreSQL, expected: false}, + "SQL Server unsupported": {input: pkgConst.DBTypeSQLServer, expected: false}, + "PolarDB For MySQL supported": {input: pkgConst.DBTypePolarDBForMySQL, expected: true}, } for name, tc := range cases { t.Run(name, func(t *testing.T) { @@ -61,3 +65,58 @@ func Test_SupportDBType(t *testing.T) { }) } } + +func Test_buildMongoDatasourceOptions(t *testing.T) { + defaultDB := "appdb" + defaultSchema, propertiesValue, jdbcParams := buildMongoDatasourceOptions(&biz.DBService{ + DBType: string(pkgConst.DBTypeMongoDB), + Host: "127.0.0.1", + Port: "27017", + AdditionalParams: pkgParams.Params{ + &pkgParams.Param{Key: mongoDefaultDatabaseParam, Value: defaultDB, Type: pkgParams.ParamTypeString}, + &pkgParams.Param{Key: mongoAuthDatabaseParam, Value: "admin", Type: pkgParams.ParamTypeString}, + &pkgParams.Param{Key: mongoAuthMechanismParam, Value: "SCRAM-SHA-256", Type: pkgParams.ParamTypeString}, + &pkgParams.Param{Key: mongoReplicaSetParam, Value: "rs0", Type: pkgParams.ParamTypeString}, + &pkgParams.Param{Key: mongoTLSEnabledParam, Value: "true", Type: pkgParams.ParamTypeBool}, + &pkgParams.Param{Key: mongoDirectConnectionParam, Value: "true", Type: pkgParams.ParamTypeBool}, + &pkgParams.Param{Key: mongoTLSSkipVerifyParam, Value: "true", Type: pkgParams.ParamTypeBool}, + }, + }) + if defaultSchema == nil || *defaultSchema != defaultDB { + t.Fatalf("unexpected default schema: %#v", defaultSchema) + } + if propertiesValue != nil { + t.Fatalf("expected nil properties, got %#v", propertiesValue) + } + if jdbcParams["authSource"] != "admin" { + t.Fatalf("unexpected authSource: %#v", jdbcParams["authSource"]) + } + if jdbcParams["authMechanism"] != "SCRAM-SHA-256" { + t.Fatalf("unexpected authMechanism: %#v", jdbcParams["authMechanism"]) + } + if jdbcParams["replicaSet"] != "rs0" { + t.Fatalf("unexpected replicaSet: %#v", jdbcParams["replicaSet"]) + } + if jdbcParams["tls"] != "true" { + t.Fatalf("unexpected tls: %#v", jdbcParams["tls"]) + } + if jdbcParams["directConnection"] != true || jdbcParams["tlsInsecure"] != true { + t.Fatalf("unexpected jdbc params: %#v", jdbcParams) + } +} + +func Test_buildMongoDatasourceOptions_tlsOnly(t *testing.T) { + _, propertiesValue, jdbcParams := buildMongoDatasourceOptions(&biz.DBService{ + DBType: string(pkgConst.DBTypeMongoDB), + AdditionalParams: pkgParams.Params{ + &pkgParams.Param{Key: mongoTLSEnabledParam, Value: "true", Type: pkgParams.ParamTypeBool}, + }, + }) + if propertiesValue != nil { + t.Fatalf("expected nil properties, got %#v", propertiesValue) + } + if jdbcParams["tls"] != "true" { + t.Fatalf("expected tls in jdbcUrlParameters when only tls is configured, got %#v", jdbcParams) + } +} +