Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ public CommonConfig setMemtableSizeThreshold(long memtableSizeThreshold) {
return this;
}

@Override
public CommonConfig setMetadataLeaseFenceMs(long metadataLeaseFenceMs) {
setProperty("metadata_lease_fence_ms", String.valueOf(metadataLeaseFenceMs));
return this;
}

@Override
public CommonConfig setPartitionInterval(long partitionInterval) {
setProperty("time_partition_interval", String.valueOf(partitionInterval));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public CommonConfig setMemtableSizeThreshold(long memtableSizeThreshold) {
return this;
}

@Override
public CommonConfig setMetadataLeaseFenceMs(long metadataLeaseFenceMs) {
cnConfig.setMetadataLeaseFenceMs(metadataLeaseFenceMs);
dnConfig.setMetadataLeaseFenceMs(metadataLeaseFenceMs);
return this;
}

@Override
public CommonConfig setPartitionInterval(long partitionInterval) {
cnConfig.setPartitionInterval(partitionInterval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public CommonConfig setMemtableSizeThreshold(long memtableSizeThreshold) {
return this;
}

@Override
public CommonConfig setMetadataLeaseFenceMs(long metadataLeaseFenceMs) {
return this;
}

@Override
public CommonConfig setPartitionInterval(long partitionInterval) {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public interface CommonConfig {

CommonConfig setMemtableSizeThreshold(long memtableSizeThreshold);

CommonConfig setMetadataLeaseFenceMs(long metadataLeaseFenceMs);

CommonConfig setPartitionInterval(long partitionInterval);

CommonConfig setCompressor(String compressor);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.relational.it.schema;

import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.TableClusterIT;
import org.apache.iotdb.itbase.env.BaseEnv;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.Callable;

import static org.junit.Assert.assertTrue;

@RunWith(IoTDBTestRunner.class)
@Category({TableClusterIT.class})
public class IoTDBTableDDLHAIT {

private final Logger LOGGER = LoggerFactory.getLogger(IoTDBTableDDLHAIT.class);

@BeforeClass
public static void setUp() throws Exception {
// Small fence threshold so the ConfigNode can prove the stopped DataNode is self-fenced quickly
// (T_proceed = fence + ~5s internal margin), keeping the test fast. Live DataNodes keep
// heartbeating (~1s), so they do not spuriously fence.
// Use 3 replicas so metadata/data-region operations such as DELETE DEVICES can still succeed
// after one DataNode is stopped.
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setMetadataLeaseFenceMs(20000) // default value
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
.setSchemaReplicationFactor(3)
.setDataReplicationFactor(3);
EnvFactory.getEnv().initClusterEnvironment(1, 3);
}

@AfterClass
public static void tearDown() throws Exception {
EnvFactory.getEnv().cleanClusterEnvironment();
}

@Test
public void tableDdlSucceedsWhileOneDataNodeIsDown() throws Exception {
final String databaseName = "test_table_ddl_ha";
final String tableName = "table_ddl_ha";
final String createdAfterDownTableName = "table_ddl_ha_created_after_down";
final DataNodeWrapper liveDataNode = EnvFactory.getEnv().getDataNodeWrapper(0);
final DataNodeWrapper victimDataNode = EnvFactory.getEnv().getDataNodeWrapper(2);

// Pin the connection to a DataNode we will keep alive, so stopping the victim cannot break it.
try (final Connection connection =
EnvFactory.getEnv()
.getConnection(liveDataNode, "root", "root", BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute("CREATE DATABASE " + databaseName);
statement.execute("USE " + databaseName);
statement.execute("CREATE TABLE " + tableName + " (dev STRING TAG, s1 INT32 FIELD)");
statement.execute(
"INSERT INTO "
+ tableName
+ "(time, dev, s1) VALUES(1, 'dev01', 1), (2, 'dev02', 2), (3, 'dev03', 3)");

// ready for the drop database
statement.execute("CREATE TABLE TABLE1 (dev STRING TAG, s1 INT32 FIELD)");
statement.execute(
"INSERT INTO TABLE1 (time, dev, s1) VALUES(1, 'dev01', 1), (2, 'dev02', 2), (3, 'dev03', 3)");
// Take one DataNode down. Its last successful ConfigNode contact is now frozen; after
// T_proceed the ConfigNode can treat it as self-fenced and stop waiting for its ack.
victimDataNode.stop();
Assert.assertFalse("victim DataNode should be stopped", victimDataNode.isAlive());

// The DDL broadcast can no longer reach the stopped DataNode. Previously this hard-failed;
// now it must still succeed (after blocking ~T_proceed while the fence is proven).
LOGGER.info("0. start to test high availability of creating table procedure");
assertStatementEffect(
statement,
"CREATE TABLE "
+ createdAfterDownTableName
+ " (region STRING TAG, temperature FLOAT FIELD)",
() -> tableExists(statement, createdAfterDownTableName),
"CREATE TABLE must succeed with one DataNode down");

LOGGER.info("1. start to test high availability of adding column procedure");
assertStatementEffect(
statement,
"ALTER TABLE " + tableName + " ADD COLUMN s2 INT32 FIELD",
() -> columnHasType(statement, tableName, "s2", "INT32"),
"ADD COLUMN must succeed with one DataNode down");

LOGGER.info("2. start to test high availability of altering column type procedure");
assertStatementEffect(
statement,
"ALTER TABLE " + tableName + " ALTER COLUMN s2 SET DATA TYPE INT64",
() -> columnHasType(statement, tableName, "s2", "INT64"),
"ALTER COLUMN TYPE must succeed with one DataNode down");

LOGGER.info("3. start to test high availability of altering table ttl procedure");
assertStatementEffect(
statement,
"ALTER TABLE " + tableName + " SET PROPERTIES ttl = 864000",
() -> tableHasTtl(statement, tableName, "864000"),
"ALTER TABLE TTL must succeed with one DataNode down");

LOGGER.info("4. start to test high availability of resetting table ttl procedure");
assertStatementEffect(
statement,
"ALTER TABLE " + tableName + " SET PROPERTIES ttl = 'INF'",
() -> tableHasTtl(statement, tableName, "INF"),
"ALTER TABLE TTL reset must succeed with one DataNode down");

LOGGER.info("5. start to test high availability of deleting devices procedure");
assertStatementEffect(
statement,
"DELETE DEVICES FROM " + tableName + " WHERE dev = 'dev02'",
() -> !deviceExists(statement, tableName, "dev02"),
"DELETE DEVICES must succeed with one DataNode down");

LOGGER.info("6. start to test high availability of dropping table procedure");
assertStatementEffect(
statement,
"DROP TABLE " + tableName,
() -> !tableExists(statement, tableName),
"DROP TABLE must succeed with one DataNode down");

LOGGER.info("7. start to test high availability of dropping database procedure");
assertStatementEffect(
statement,
"DROP DATABASE " + databaseName,
() -> !databaseExists(statement, databaseName),
"DROP DATABASE must succeed with one DataNode down");
}
}

private void assertStatementEffect(
final Statement statement,
final String sql,
final Callable<Boolean> effect,
final String message)
throws Exception {
statement.execute(sql);
assertTrue(message, effect.call());
}

private boolean tableExists(final Statement statement, final String tableName) throws Exception {
try (final ResultSet resultSet = statement.executeQuery("SHOW TABLES")) {
while (resultSet.next()) {
if (tableName.equalsIgnoreCase(resultSet.getString(1))) {
return true;
}
}
}
return false;
}

private boolean columnHasType(
final Statement statement,
final String tableName,
final String columnName,
final String dataType)
throws Exception {
try (final ResultSet resultSet = statement.executeQuery("DESCRIBE " + tableName)) {
while (resultSet.next()) {
if (columnName.equalsIgnoreCase(resultSet.getString(1))) {
return dataType.equalsIgnoreCase(resultSet.getString(2));
}
}
}
return false;
}

private boolean tableHasTtl(final Statement statement, final String tableName, final String ttl)
throws Exception {
try (final ResultSet resultSet = statement.executeQuery("SHOW TABLES")) {
while (resultSet.next()) {
if (tableName.equalsIgnoreCase(resultSet.getString(1))) {
return ttl.equalsIgnoreCase(resultSet.getString(2));
}
}
}
return false;
}

private boolean deviceExists(
final Statement statement, final String tableName, final String device) throws Exception {
try (final ResultSet resultSet =
statement.executeQuery(
"SHOW DEVICES FROM " + tableName + " WHERE dev = '" + device + "'")) {
return resultSet.next();
}
}

private boolean databaseExists(final Statement statement, final String databaseName)
throws Exception {
try (final ResultSet resultSet = statement.executeQuery("SHOW DATABASES")) {
while (resultSet.next()) {
if (databaseName.equalsIgnoreCase(resultSet.getString(1))) {
return true;
}
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public enum TSStatusCode {
TYPE_NOT_FOUND(528),
DATABASE_CONFLICT(529),
DATABASE_MODEL(530),
METADATA_LEASE_FENCED(531),

TABLE_NOT_EXISTS(550),
TABLE_ALREADY_EXISTS(551),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ public final class ProcedureMessages {
"Failed to pre-release {} for table {}.{} to DataNode, failure results: {}";
public static final String FAILED_TO_PRE_SET_TEMPLATE_ON_PATH_DUE_TO =
"Failed to pre set template {} on path {} due to {}";
public static final String FAILED_TO_PROVE_DN_IS_FENCED = "Failed to prove DN is fenced";
public static final String FAILED_TO_PUSH_CONSUMER_GROUP_META_TO_DATANODES_DETAILS =
"Failed to push consumer group meta to dataNodes, details: %s";
public static final String FAILED_TO_PUSH_PIPE_META_LIST_TO_DATA_NODES_WILL =
Expand Down Expand Up @@ -510,7 +511,7 @@ public final class ProcedureMessages {
public static final String FAILED_TO_SYNC_TABLE_PRE_CREATE_INFO_TO_DATANODE_FAILURE =
"Failed to sync table {}.{} pre-create info to DataNode, failure results: {}";
public static final String FAILED_TO_SYNC_TABLE_ROLLBACK_CREATE_INFO_TO_DATANODE_FAILURE =
"Failed to sync table {}.{} rollback-create info to DataNode {}, failure results: ";
"Failed to sync table {}.{} rollback-create info to DataNode, failure results: {}";
public static final String FAILED_TO_SYNC_TEMPLATE_COMMIT_SET_INFO_ON_PATH_TO =
"Failed to sync template {} commit-set info on path {} to DataNode {}";
public static final String FAILED_TO_SYNC_TEMPLATE_PRE_SET_INFO_ON_PATH_TO =
Expand Down Expand Up @@ -575,8 +576,10 @@ public final class ProcedureMessages {
"Invalidate view schemaengine cache failed";
public static final String INVALIDATING_CACHE_FOR_COLUMN_IN_WHEN_DROPPING_COLUMN =
"Invalidating cache for column {} in {}.{} when dropping column";
public static final String INVALIDATING_CACHE_FOR_TABLE_WHEN_DROPPING_TABLE =
"Invalidating cache for table {}.{} when dropping table";
public static final String PRE_RELEASE_DELETE_TABLE_WHEN_DROPPING_TABLE =
"pre release delete table {}.{} when dropping table";
public static final String COMMIT_RELEASE_DELETE_TABLE_WHEN_DROPPING_TABLE =
"commit release delete table {}.{} when dropping table";
public static final String INVALID_DATA_TYPE_CANNOT_BE_USED_AS_A_NEW_TYPE =
"Invalid data type cannot be used as a new type";
public static final String IO_ERROR_WHEN_DESERIALIZE_AUTHPLAN =
Expand Down Expand Up @@ -845,6 +848,8 @@ public final class ProcedureMessages {
public static final String ROLLBACK_CREATETABLE_COSTS_MS = "Rollback CreateTable-{} costs {}ms.";
public static final String ROLLBACK_CREATE_TABLE_FAILED = "Rollback create table failed";
public static final String ROLLBACK_DROPTABLE_COSTS_MS = "Rollback DropTable-{} costs {}ms.";
public static final String ROLLBACK_PRE_DELETE_TABLE_FAILED =
"Rollback pre-delete table %s.%s failed, please manually drop the table";
public static final String ROLLBACK_PRE_RELEASE = "Rollback pre-release ";
public static final String ROLLBACK_PRE_RELEASE_TEMPLATE_FAILED =
"Rollback pre release template failed";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ public final class ProcedureMessages {
"Failed to pre-release {} for table {}.{} to DataNode, failure results: {}";
public static final String FAILED_TO_PRE_SET_TEMPLATE_ON_PATH_DUE_TO =
"Failed to pre set template {} on path {} due to {}";
public static final String FAILED_TO_PROVE_DN_IS_FENCED = "不能证明一个不可达的DN已经处于隔离状态";
public static final String FAILED_TO_PUSH_CONSUMER_GROUP_META_TO_DATANODES_DETAILS =
"Failed to push consumer group meta to dataNodes, details: %s";
public static final String FAILED_TO_PUSH_PIPE_META_LIST_TO_DATA_NODES_WILL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public enum CnToDnAsyncRequestType {
CHANGE_REGION_LEADER,

// Cache
INVALIDATE_PARTITION_CACHE,
INVALIDATE_SCHEMA_CACHE,
INVALIDATE_LAST_CACHE,
CLEAR_CACHE,
Expand Down Expand Up @@ -121,7 +122,7 @@ public enum CnToDnAsyncRequestType {

// Table
UPDATE_TABLE,
INVALIDATE_TABLE_CACHE,
PRE_DELETE_TABLE,
DELETE_DATA_FOR_DROP_TABLE,
DELETE_DEVICES_FOR_DROP_TABLE,
INVALIDATE_COLUMN_CACHE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ protected void initActionMapBuilder() {
(req, client, handler) ->
client.fetchSchemaBlackList(
(TFetchSchemaBlackListReq) req, (FetchSchemaBlackListRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.INVALIDATE_PARTITION_CACHE,
(req, client, handler) ->
client.invalidatePartitionCache(
(TInvalidateCacheReq) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.INVALIDATE_SCHEMA_CACHE,
(req, client, handler) ->
Expand Down Expand Up @@ -442,7 +447,7 @@ protected void initActionMapBuilder() {
(req, client, handler) ->
client.updateTable((TUpdateTableReq) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.INVALIDATE_TABLE_CACHE,
CnToDnAsyncRequestType.PRE_DELETE_TABLE,
(req, client, handler) ->
client.invalidateTableCache(
(TInvalidateTableCacheReq) req, (DataNodeTSStatusRPCHandler) handler));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.lease.DataNodeContactTracker;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
Expand Down Expand Up @@ -93,6 +94,11 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) {
}

private void cacheNodeHeartbeatSample(TDataNodeHeartbeatResp heartbeatResp) {
// A successful response confirms ConfigNode->DataNode contact; stamp it on the ConfigNode clock
// for the metadata-lease verdict. Kept separate from the load-cache samples (which record the
// echoed send-time) and deliberately not touched in onError, so failures never advance it.
final DataNodeContactTracker contactTracker = DataNodeContactTracker.getInstance();
contactTracker.recordSuccessfulResponse(nodeId);
loadManager
.getLoadCache()
.cacheDataNodeHeartbeatSample(nodeId, new NodeHeartbeatSample(heartbeatResp));
Expand Down
Loading
Loading