Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/dms/pkg/constant/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ func ParseDBType(s string) (DBType, error) {
return DBTypeHANA, nil
case "PolarDB For MySQL":
return DBTypePolarDBForMySQL, nil
case "MongoDB":
return DBTypeMongoDB, nil
case "OceanBase For Oracle":
return DBTypeOceanBaseOracle, nil

Expand All @@ -281,6 +283,7 @@ const (
DBTypeGaussDBForMySQL DBType = "GaussDB for MySQL"
DBTypeHANA DBType = "HANA"
DBTypePolarDBForMySQL DBType = "PolarDB For MySQL"
DBTypeMongoDB DBType = "MongoDB"
DBTypeOceanBaseOracle DBType = "OceanBase For Oracle"
)

Expand Down
51 changes: 26 additions & 25 deletions internal/dms/pkg/constant/const_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
func TestCheckDBTypeIfDataExportSupported_NewTypes(t *testing.T) {
// 验证新增的数据源应在白名单中
newTypes := map[string]bool{
"TiDB": true,
"TDSQL For InnoDB": true,
"GoldenDB": true,
"TBase": true,
"TiDB": true,
"TDSQL For InnoDB": true,
"GoldenDB": true,
"TBase": true,
// Issue #2868: GaussDB (PostgreSQL 协议) 与 GaussDB for MySQL (MySQL 协议) 是两个独立产品
"GaussDB": true, // PostgreSQL 协议 GaussDB / openGauss, 走 opengauss-connector-go-pq 驱动
"GaussDB for MySQL": true, // 华为云 GaussDB(for MySQL), 走 MySQL 驱动
Expand Down Expand Up @@ -51,9 +51,9 @@ func TestCheckDBTypeIfDataExportSupported_ExistingTypes(t *testing.T) {

func TestParseDBType_PolarDB(t *testing.T) {
cases := map[string]struct {
input string
wantDBType DBType
wantErr bool
input string
wantDBType DBType
wantErr bool
}{
"valid PolarDB For MySQL": {
input: "PolarDB For MySQL",
Expand Down Expand Up @@ -109,28 +109,29 @@ func TestParseDBType(t *testing.T) {
expected DBType
expectError bool
}{
"MySQL": {input: "MySQL", expected: DBTypeMySQL},
"TDSQL For InnoDB": {input: "TDSQL For InnoDB", expected: DBTypeTDSQLForInnoDB},
"TiDB": {input: "TiDB", expected: DBTypeTiDB},
"PostgreSQL": {input: "PostgreSQL", expected: DBTypePostgreSQL},
"Oracle": {input: "Oracle", expected: DBTypeOracle},
"DB2": {input: "DB2", expected: DBTypeDB2},
"SQL Server": {input: "SQL Server", expected: DBTypeSQLServer},
"MySQL": {input: "MySQL", expected: DBTypeMySQL},
"TDSQL For InnoDB": {input: "TDSQL For InnoDB", expected: DBTypeTDSQLForInnoDB},
"TiDB": {input: "TiDB", expected: DBTypeTiDB},
"PostgreSQL": {input: "PostgreSQL", expected: DBTypePostgreSQL},
"Oracle": {input: "Oracle", expected: DBTypeOracle},
"DB2": {input: "DB2", expected: DBTypeDB2},
"SQL Server": {input: "SQL Server", expected: DBTypeSQLServer},
"OceanBase For MySQL": {input: "OceanBase For MySQL", expected: DBTypeOceanBaseMySQL},
"GoldenDB": {input: "GoldenDB", expected: DBTypeGoldenDB},
"TBase": {input: "TBase", expected: DBTypeTBase},
"Hive": {input: "Hive", expected: DBTypeHive},
"DM": {input: "DM", expected: DBTypeDM},
"GoldenDB": {input: "GoldenDB", expected: DBTypeGoldenDB},
"TBase": {input: "TBase", expected: DBTypeTBase},
"Hive": {input: "Hive", expected: DBTypeHive},
"DM": {input: "DM", expected: DBTypeDM},
// Issue #2868: 拆分 GaussDB / GaussDB for MySQL 为两个独立产品
"GaussDB": {input: "GaussDB", expected: DBTypeGaussDB},
"GaussDB for MySQL": {input: "GaussDB for MySQL", expected: DBTypeGaussDBForMySQL},
"HANA": {input: "HANA", expected: DBTypeHANA},
"GaussDB": {input: "GaussDB", expected: DBTypeGaussDB},
"GaussDB for MySQL": {input: "GaussDB for MySQL", expected: DBTypeGaussDBForMySQL},
"HANA": {input: "HANA", expected: DBTypeHANA},
// PolarDB-MySQL 新增 (Issue #826)
"PolarDB For MySQL": {input: "PolarDB For MySQL", expected: DBTypePolarDBForMySQL},
"PolarDB For MySQL": {input: "PolarDB For MySQL", expected: DBTypePolarDBForMySQL},
"MongoDB": {input: "MongoDB", expected: DBTypeMongoDB},
// "PolarDB" 单独不应匹配
"PolarDB only": {input: "PolarDB", expectError: true},
"invalid type": {input: "UnknownDB", expectError: true},
"empty string": {input: "", expectError: true},
"PolarDB only": {input: "PolarDB", expectError: true},
"invalid type": {input: "UnknownDB", expectError: true},
"empty string": {input: "", expectError: true},
}

for name, tc := range tests {
Expand Down
2 changes: 1 addition & 1 deletion internal/sql_workbench/client/sql_workbench_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (c *SqlWorkbenchClient) CreateDatasources(datasource CreateDatasourceReques
// 检查HTTP状态码
if resp.StatusCode != http.StatusOK {
c.log.Errorf("Create datasource failed with status code: %d, response: %s", resp.StatusCode, string(body))
return nil, fmt.Errorf("create datasource failed with status code: %d", resp.StatusCode)
return nil, fmt.Errorf("create datasource failed with status code: %d, response: %s", resp.StatusCode, string(body))
}

// 解析响应
Expand Down
115 changes: 94 additions & 21 deletions internal/sql_workbench/service/sql_workbench_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,8 +851,20 @@ type datasourceBaseInfo struct {
ServiceName *string
EnvironmentID int64
DefaultSchema *string
Properties interface{}
JDBCParams map[string]interface{}
}

const (
mongoDefaultDatabaseParam = "default_database"
mongoAuthDatabaseParam = "auth_source"
mongoAuthMechanismParam = "auth_mechanism"
mongoReplicaSetParam = "replica_set"
mongoTLSEnabledParam = "tls"
mongoDirectConnectionParam = "direct_connection"
mongoTLSSkipVerifyParam = "tls_skip_verify"
)

// buildDatasourceBaseInfo 构建数据源基础信息
func (sqlWorkbenchService *SqlWorkbenchService) buildDatasourceBaseInfo(ctx context.Context, dbService *biz.DBService, environmentID int64) (*datasourceBaseInfo, error) {
datasourceName, err := sqlWorkbenchService.buildDatasourceName(ctx, dbService)
Expand All @@ -876,6 +888,10 @@ func (sqlWorkbenchService *SqlWorkbenchService) buildDatasourceBaseInfo(ctx cont
baseInfo.ServiceName = &serviceName
}

if dbService.DBType == string(pkgConst.DBTypeMongoDB) {
baseInfo.DefaultSchema, baseInfo.Properties, baseInfo.JDBCParams = buildMongoDatasourceOptions(dbService)
}

return baseInfo, nil
}

Expand All @@ -887,17 +903,19 @@ func (sqlWorkbenchService *SqlWorkbenchService) buildCreateDatasourceRequest(ctx
}

return client.CreateDatasourceRequest{
CreatorID: sqlWorkbenchUser.SqlWorkbenchUserId,
Type: baseInfo.Type,
Name: baseInfo.Name,
Username: baseInfo.Username,
Password: baseInfo.Password,
Host: baseInfo.Host,
Port: baseInfo.Port,
ServiceName: baseInfo.ServiceName,
SSLConfig: client.SSLConfig{Enabled: false},
EnvironmentID: baseInfo.EnvironmentID,
DefaultSchema: baseInfo.DefaultSchema,
CreatorID: sqlWorkbenchUser.SqlWorkbenchUserId,
Type: baseInfo.Type,
Name: baseInfo.Name,
Username: baseInfo.Username,
Password: baseInfo.Password,
Host: baseInfo.Host,
Port: baseInfo.Port,
ServiceName: baseInfo.ServiceName,
SSLConfig: client.SSLConfig{Enabled: false},
Properties: baseInfo.Properties,
EnvironmentID: baseInfo.EnvironmentID,
JdbcURLParameters: baseInfo.JDBCParams,
DefaultSchema: baseInfo.DefaultSchema,
}, nil
}

Expand All @@ -909,16 +927,18 @@ func (sqlWorkbenchService *SqlWorkbenchService) buildUpdateDatasourceRequest(ctx
}

return client.UpdateDatasourceRequest{
Type: baseInfo.Type,
Name: &baseInfo.Name,
Username: baseInfo.Username,
Password: &baseInfo.Password,
Host: baseInfo.Host,
Port: baseInfo.Port,
ServiceName: baseInfo.ServiceName,
SSLConfig: client.SSLConfig{Enabled: false},
EnvironmentID: baseInfo.EnvironmentID,
DefaultSchema: baseInfo.DefaultSchema,
Type: baseInfo.Type,
Name: &baseInfo.Name,
Username: baseInfo.Username,
Password: &baseInfo.Password,
Host: baseInfo.Host,
Port: baseInfo.Port,
ServiceName: baseInfo.ServiceName,
SSLConfig: client.SSLConfig{Enabled: false},
Properties: interfacePtr(baseInfo.Properties),
EnvironmentID: baseInfo.EnvironmentID,
JdbcURLParameters: mapPtr(baseInfo.JDBCParams),
DefaultSchema: baseInfo.DefaultSchema,
}, nil
}

Expand Down Expand Up @@ -950,6 +970,8 @@ func (sqlWorkbenchService *SqlWorkbenchService) convertDBType(dmsDBType string)
return "MYSQL"
case "PolarDB For MySQL":
return "MYSQL"
case "MongoDB":
return "MONGODB"
default:
return dmsDBType
}
Expand All @@ -966,6 +988,57 @@ func (sqlWorkbenchService *SqlWorkbenchService) SupportDBType(dbType pkgConst.DB
dbType == pkgConst.DBTypePolarDBForMySQL
}

func buildMongoDatasourceOptions(dbService *biz.DBService) (*string, interface{}, map[string]interface{}) {
defaultDatabase := dbService.AdditionalParams.GetParam(mongoDefaultDatabaseParam).String()
var defaultSchema *string
if defaultDatabase != "" {
defaultSchema = &defaultDatabase
}

jdbcParams := map[string]interface{}{}
if authDB := dbService.AdditionalParams.GetParam(mongoAuthDatabaseParam).String(); authDB != "" {
jdbcParams["authSource"] = authDB
}
if authMechanism := dbService.AdditionalParams.GetParam(mongoAuthMechanismParam).String(); authMechanism != "" {
jdbcParams["authMechanism"] = authMechanism
}
if replicaSet := dbService.AdditionalParams.GetParam(mongoReplicaSetParam).String(); replicaSet != "" {
jdbcParams["replicaSet"] = replicaSet
}
if tlsParam := dbService.AdditionalParams.GetParam(mongoTLSEnabledParam); tlsParam != nil && tlsParam.String() != "" {
if tlsParam.Bool() {
jdbcParams["tls"] = "true"
} else {
jdbcParams["tls"] = "false"
}
}
if dbService.AdditionalParams.GetParam(mongoDirectConnectionParam).Bool() {
jdbcParams["directConnection"] = true
}
if dbService.AdditionalParams.GetParam(mongoTLSSkipVerifyParam).Bool() {
jdbcParams["tlsInsecure"] = true
}

if len(jdbcParams) == 0 {
return defaultSchema, nil, nil
}
return defaultSchema, nil, jdbcParams
}

func interfacePtr(v interface{}) *interface{} {
if v == nil {
return nil
}
return &v
}

func mapPtr(v map[string]interface{}) *map[string]interface{} {
if len(v) == 0 {
return nil
}
return &v
}

// buildDatabaseUser 当是ob-mysql时需要给账号管理的账号附加租户名集群名等字符: root@oms_mysql#oms_resource_4250
func buildDatabaseUser(account string, dbServiceUser string, dbType string) string {
if dbType == string(pkgConst.DBTypeOceanBaseMySQL) {
Expand Down
95 changes: 77 additions & 18 deletions internal/sql_workbench/service/sql_workbench_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package sql_workbench
import (
"testing"

"github.com/actiontech/dms/internal/dms/biz"
pkgConst "github.com/actiontech/dms/internal/dms/pkg/constant"
pkgParams "github.com/actiontech/dms/pkg/params"
)

func Test_convertDBType(t *testing.T) {
Expand All @@ -12,17 +14,18 @@ func Test_convertDBType(t *testing.T) {
input string
expected string
}{
"DM": {input: "达梦(DM)", expected: "DM"},
"MySQL": {input: "MySQL", expected: "MYSQL"},
"PostgreSQL": {input: "PostgreSQL", expected: "POSTGRESQL"},
"Oracle": {input: "Oracle", expected: "ORACLE"},
"SQL Server": {input: "SQL Server", expected: "SQL_SERVER"},
"OB Oracle": {input: "OceanBase For Oracle", expected: "OB_ORACLE"},
"OB MySQL": {input: "OceanBase For MySQL", expected: "OB_MYSQL"},
"DM": {input: "达梦(DM)", expected: "DM"},
"MySQL": {input: "MySQL", expected: "MYSQL"},
"PostgreSQL": {input: "PostgreSQL", expected: "POSTGRESQL"},
"Oracle": {input: "Oracle", expected: "ORACLE"},
"SQL Server": {input: "SQL Server", expected: "SQL_SERVER"},
"OB Oracle": {input: "OceanBase For Oracle", expected: "OB_ORACLE"},
"OB MySQL": {input: "OceanBase For MySQL", expected: "OB_MYSQL"},
"TiDB": {input: "TiDB", expected: "TIDB"},
"TDSQL For InnoDB": {input: "TDSQL For InnoDB", expected: "MYSQL"},
"TDSQL For InnoDB": {input: "TDSQL For InnoDB", expected: "MYSQL"},
"GoldenDB": {input: "GoldenDB", expected: "MYSQL"},
"PolarDB For MySQL": {input: "PolarDB For MySQL", expected: "MYSQL"},
"MongoDB": {input: "MongoDB", expected: "MONGODB"},
"Unknown passthrough": {input: "UnknownDB", expected: "UnknownDB"},
}
for name, tc := range cases {
Expand All @@ -41,16 +44,17 @@ func Test_SupportDBType(t *testing.T) {
input pkgConst.DBType
expected bool
}{
"DM supported": {input: pkgConst.DBTypeDM, expected: true},
"MySQL supported": {input: pkgConst.DBTypeMySQL, expected: true},
"Oracle supported": {input: pkgConst.DBTypeOracle, expected: true},
"OB MySQL supported": {input: pkgConst.DBTypeOceanBaseMySQL, expected: true},
"TiDB supported": {input: pkgConst.DBTypeTiDB, expected: true},
"TDSQL supported": {input: pkgConst.DBTypeTDSQLForInnoDB, expected: true},
"GoldenDB supported": {input: pkgConst.DBTypeGoldenDB, expected: true},
"PostgreSQL unsupported": {input: pkgConst.DBTypePostgreSQL, expected: false},
"SQL Server unsupported": {input: pkgConst.DBTypeSQLServer, expected: false},
"PolarDB For MySQL supported": {input: pkgConst.DBTypePolarDBForMySQL, expected: true},
"DM supported": {input: pkgConst.DBTypeDM, expected: true},
"MySQL supported": {input: pkgConst.DBTypeMySQL, expected: true},
"Oracle supported": {input: pkgConst.DBTypeOracle, expected: true},
"OB MySQL supported": {input: pkgConst.DBTypeOceanBaseMySQL, expected: true},
"TiDB supported": {input: pkgConst.DBTypeTiDB, expected: true},
"TDSQL supported": {input: pkgConst.DBTypeTDSQLForInnoDB, expected: true},
"GoldenDB supported": {input: pkgConst.DBTypeGoldenDB, expected: true},
"MongoDB unsupported": {input: pkgConst.DBTypeMongoDB, expected: false},
"PostgreSQL unsupported": {input: pkgConst.DBTypePostgreSQL, expected: false},
"SQL Server unsupported": {input: pkgConst.DBTypeSQLServer, expected: false},
"PolarDB For MySQL supported": {input: pkgConst.DBTypePolarDBForMySQL, expected: true},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand All @@ -61,3 +65,58 @@ func Test_SupportDBType(t *testing.T) {
})
}
}

func Test_buildMongoDatasourceOptions(t *testing.T) {
defaultDB := "appdb"
defaultSchema, propertiesValue, jdbcParams := buildMongoDatasourceOptions(&biz.DBService{
DBType: string(pkgConst.DBTypeMongoDB),
Host: "127.0.0.1",
Port: "27017",
AdditionalParams: pkgParams.Params{
&pkgParams.Param{Key: mongoDefaultDatabaseParam, Value: defaultDB, Type: pkgParams.ParamTypeString},
&pkgParams.Param{Key: mongoAuthDatabaseParam, Value: "admin", Type: pkgParams.ParamTypeString},
&pkgParams.Param{Key: mongoAuthMechanismParam, Value: "SCRAM-SHA-256", Type: pkgParams.ParamTypeString},
&pkgParams.Param{Key: mongoReplicaSetParam, Value: "rs0", Type: pkgParams.ParamTypeString},
&pkgParams.Param{Key: mongoTLSEnabledParam, Value: "true", Type: pkgParams.ParamTypeBool},
&pkgParams.Param{Key: mongoDirectConnectionParam, Value: "true", Type: pkgParams.ParamTypeBool},
&pkgParams.Param{Key: mongoTLSSkipVerifyParam, Value: "true", Type: pkgParams.ParamTypeBool},
},
})
if defaultSchema == nil || *defaultSchema != defaultDB {
t.Fatalf("unexpected default schema: %#v", defaultSchema)
}
if propertiesValue != nil {
t.Fatalf("expected nil properties, got %#v", propertiesValue)
}
if jdbcParams["authSource"] != "admin" {
t.Fatalf("unexpected authSource: %#v", jdbcParams["authSource"])
}
if jdbcParams["authMechanism"] != "SCRAM-SHA-256" {
t.Fatalf("unexpected authMechanism: %#v", jdbcParams["authMechanism"])
}
if jdbcParams["replicaSet"] != "rs0" {
t.Fatalf("unexpected replicaSet: %#v", jdbcParams["replicaSet"])
}
if jdbcParams["tls"] != "true" {
t.Fatalf("unexpected tls: %#v", jdbcParams["tls"])
}
if jdbcParams["directConnection"] != true || jdbcParams["tlsInsecure"] != true {
t.Fatalf("unexpected jdbc params: %#v", jdbcParams)
}
}

func Test_buildMongoDatasourceOptions_tlsOnly(t *testing.T) {
_, propertiesValue, jdbcParams := buildMongoDatasourceOptions(&biz.DBService{
DBType: string(pkgConst.DBTypeMongoDB),
AdditionalParams: pkgParams.Params{
&pkgParams.Param{Key: mongoTLSEnabledParam, Value: "true", Type: pkgParams.ParamTypeBool},
},
})
if propertiesValue != nil {
t.Fatalf("expected nil properties, got %#v", propertiesValue)
}
if jdbcParams["tls"] != "true" {
t.Fatalf("expected tls in jdbcUrlParameters when only tls is configured, got %#v", jdbcParams)
}
}