深入elasticsearch 5 [1] —— 索引创建

  |   0 评论   |   863 浏览

本文背景

ES中的第一个痛点是创建索引。

在数据量大时,我们会将数据分布在多个索引上。

索引数量越来越多,创建新索引会越来越慢。

于是需要预先创建索引。

即使这样,预先创建一个索引也需要几秒至十几秒。

本文将揭示创建索引的全过程,以及时间消耗在了哪里。

ES基础知识

优先级任务队列

使用PriorityBlockingQueue实现了支持优先级的线程池,见类org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor。

通过enum Priority,定义了6个优先级:IMMEDIATE,URGENT,HIGH,NORMAL,LOW和LANGUID。默认优先级为NORMAL。

正在运行的任务保存在current中,等待运行的任务保存在线程池的PriorityBlockingQueue中。

调查

索引创建过程

以创建索引为例,运行顺序

master节点:平均2.8秒

  • org.elasticsearch.cluster.service.ClusterService.ClusterServiceTaskBatcher.run
  • org.elasticsearch.cluster.service.ClusterService.runTasks
  • org.elasticsearch.cluster.service.ClusterService.calculateTaskOutputs
  • org.elasticsearch.cluster.service.ClusterService.executeTasks
  • org.elasticsearch.cluster.routing.allocation.AllocationService.reroute
  • org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.allocate
  • org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.moveShards (耗时长:78%)
  • org.elasticsearch.discovery.zen.ZenDiscovery.publish
  • org.elasticsearch.discovery.zen.PublishClusterStateAction.innerPublish
  • org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler.awaitAllNodes
  • org.elasticsearch.cluster.routing.allocation.AllocationService.buildResultAndLogHealthChange
  • org.elasticsearch.cluster.service.ClusterService.callClusterStateAppliers
  • org.elasticsearch.gateway.DanglingIndicesState.clusterChanged
  • org.elasticsearch.gateway.DanglingIndicesState.processDanglingIndices
  • org.elasticsearch.gateway.DanglingIndicesState.findNewAndAddDanglingIndices
  • org.elasticsearch.gateway.DanglingIndicesState.findNewDanglingIndices
  • org.elasticsearch.gateway.MetaStateService.loadIndicesStates
  • org.elasticsearch.env.NodeEnvironment.availableIndexFolders
  • org.elasticsearch.env.NodeEnvironment.availableIndexFoldersForPath
  • org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState
  • org.elasticsearch.cluster.routing.RoutingTable.Builder.updateNodes
  • org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.balance
  • org.elasticsearch.indices.cluster.IndicesClusterStateService.updateFailedShardsCache

非master节点,平均117毫秒

  • org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor.TieBreakingPrioritizedRunnable.run
  • org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor.TieBreakingPrioritizedRunnable.runAndClean
  • org.elasticsearch.cluster.service.TaskBatcher.BatchedTask.run
  • org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed
  • org.elasticsearch.cluster.service.ClusterService.ClusterServiceTaskBatcher.run
  • org.elasticsearch.cluster.service.ClusterService.runTasks
  • org.elasticsearch.cluster.service.ClusterService.publishAndApplyChanges (耗时长, 50%)
  • org.elasticsearch.cluster.service.ClusterService.callClusterStateAppliers
  • org.elasticsearch.gateway.DanglingIndicesState.clusterChanged
  • org.elasticsearch.gateway.DanglingIndicesState.processDanglingIndices
  • org.elasticsearch.gateway.DanglingIndicesState.findNewAndAddDanglingIndices
  • org.elasticsearch.gateway.DanglingIndicesState.findNewDanglingIndices
  • org.elasticsearch.gateway.MetaStateService.loadIndicesStates
  • org.elasticsearch.env.NodeEnvironment.availableIndexFolders
  • org.elasticsearch.env.NodeEnvironment.availableIndexFoldersForPath (耗时长,24%)
  • org.elasticsearch.indices.cluster.IndicesClusterStateService.updateFailedShardsCache
  • org.elasticsearch.indices.cluster.IndicesClusterStateService.createOrUpdateShards
  • org.elasticsearch.indices.store.IndicesStore.clusterChanged
  • org.elasticsearch.cluster.service.ClusterService.calculateTaskOutputs
  • org.elasticsearch.cluster.service.ClusterService.executeTasks
  • org.elasticsearch.cluster.LocalClusterUpdateTask.execute
  • org.elasticsearch.discovery.zen.ZenDiscovery.7.execute
  • org.elasticsearch.gateway.Gateway.applyClusterState
  • org.elasticsearch.gateway.GatewayMetaState.applyClusterState
  • org.elasticsearch.indices.cluster.IndicesClusterStateService.createIndices
  • org.elasticsearch.discovery.zen.PublishClusterStateAction.SendClusterStateRequestHandler.messageReceived
  • org.elasticsearch.discovery.zen.PublishClusterStateAction.handleIncomingClusterStateRequest
  • org.elasticsearch.cluster.ClusterState.ClusterStateDiff.apply
  • org.elasticsearch.cluster.metadata.MetaData.MetaDataDiff.apply
  • org.elasticsearch.gateway.GatewayMetaState.resolveStatesToBeWritten
  • org.elasticsearch.indices.cluster.IndicesClusterStateService.updateIndices
  • org.elasticsearch.cluster.ClusterState.Builder.metaData
  • org.elasticsearch.index.IndexService.updateMapping
  • org.elasticsearch.index.mapper.MapperService.updateMapping
  • org.elasticsearch.index.mapper.MapperService.internalMerge
  • org.elasticsearch.index.mapper.MapperService.internalMerge
  • org.elasticsearch.indices.cluster.IndicesClusterStateService.removeShards
  • org.elasticsearch.indices.cluster.IndicesClusterStateService.failMissingShards
  • org.elasticsearch.indices.cluster.IndicesClusterStateService.removeUnallocatedIndices
  • org.elasticsearch.indices.cluster.IndicesClusterStateService.deleteIndices
  • org.elasticsearch.cluster.ClusterChangedEvent.indicesDeleted
  • org.elasticsearch.cluster.ClusterChangedEvent.indicesDeletedFromClusterState
  • org.elasticsearch.gateway.MetaStateService.writeIndex
  • org.elasticsearch.gateway.GatewayMetaState.getRelevantIndices
  • org.elasticsearch.gateway.GatewayMetaState.getRelevantIndicesForMasterEligibleNode

可见,性能瓶颈在master节点的moveShards操作。

moveShards过程

moveShard中的具体指标如下,后面的三个数字分别为总耗时(微秒), 调用次数(次), 平均耗时(毫秒)。此段指标可略去不看。

  • void org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.moveShards() 748436740992 292 2563.1395239452054
  • MoveDecision org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.decideMove(ShardRouting) 742236678512 1498398 0.4953535057367724
  • Decision org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders.canRemain(ShardRouting, RoutingNode, RoutingAllocation) 737779772549 1497960 0.49252303403994374
  • Decision org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.canRemain(ShardRouting, RoutingNode, RoutingAllocation) 707818809393 1497960 0.47252185614729286
  • DiskUsage org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.getDiskUsage(RoutingNode, RoutingAllocation, ImmutableOpenMap, boolean) 695320458967 1498982 0.4638617752215915
  • long org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.sizeOfRelocatingShards(RoutingNode, RoutingAllocation, boolean, String) 691377991282 1498982 0.4612316785006712
  • List org.elasticsearch.cluster.routing.RoutingNode.shardsWithState(ShardRoutingState[]) 689490029467 1498982 0.4599721825261711
  • ClusterStateTaskExecutor.ClusterTasksResult org.elasticsearch.cluster.ClusterStateUpdateTask.execute(ClusterState, List) 392892778702 146 2691.0464294657536
  • ClusterState org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.1.execute(ClusterState) 392891070840 146 2691.034731780822
  • ClusterStateTaskExecutor.ClusterTasksResult org.elasticsearch.cluster.action.shard.ShardStateAction.ShardStartedClusterStateTaskExecutor.execute(ClusterState, List) 392514198163 146 2688.4534120753424
  • ClusterState org.elasticsearch.cluster.routing.allocation.AllocationService.applyStartedShards(ClusterState, List) 392503951683 146 2688.3832307054795
  • ClusterState org.elasticsearch.cluster.routing.allocation.AllocationService.reroute(ClusterState, String) 390125461091 146 2672.0921992534245
  • ClusterState org.elasticsearch.cluster.routing.allocation.AllocationService.reroute(ClusterState, String, boolean) 390125174887 146 2672.0902389520547
  • ShardRoutingState org.elasticsearch.cluster.routing.ShardRouting.state() 199744243077 5137136998 3.88824022898111E-5
  • void org.elasticsearch.cluster.service.ClusterService.publishAndApplyChanges(ClusterService.TaskInputs, ClusterService.TaskOutputs) 74597848466 292 255.47208378767124
  • void org.elasticsearch.discovery.zen.ZenDiscovery.publish(ClusterChangedEvent, Discovery.AckListener) 40786588756 292 139.68009847945206

moveShards具体做了些什么呢?打开类org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.moveShards

其中的注释很详尽,如下:

        /**
         * Move started shards that can not be allocated to a node anymore
         *
         * For each shard to be moved this function executes a move operation
         * to the minimal eligible node with respect to the
         * weight function. If a shard is moved the shard will be set to
         * {@link ShardRoutingState#RELOCATING} and a shadow instance of this
         * shard is created with an incremented version in the state
         * {@link ShardRoutingState#INITIALIZING}.
         */
        public void moveShards() {
            // Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
            // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
            // offloading the shards.
            for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext(); ) {
                ShardRouting shardRouting = it.next();
                final MoveDecision moveDecision = decideMove(shardRouting);
                if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) {
                    final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
                    final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId());
                    sourceNode.removeShard(shardRouting);
                    Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(shardRouting, targetNode.getNodeId(),
                        allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), allocation.changes());
                    targetNode.addShard(relocatingShards.v2());
                    if (logger.isTraceEnabled()) {
                        logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
                    }
                } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) {
                    logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
                }
            }
        }
       /**
         * Makes a decision on whether to move a started shard to another node.  The following rules apply
         * to the {@link MoveDecision} return object:
         *   1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false.
         *   2. If the shard is allowed to remain on its current node, no attempt will be made to move the shard and
         *      {@link MoveDecision#canRemainDecision} will have a decision type of YES.  All other fields in the object will be null.
         *   3. If the shard is not allowed to remain on its current node, then {@link MoveDecision#getAllocationDecision()} will be
         *      populated with the decision of moving to another node.  If {@link MoveDecision#forceMove()} ()} returns {@code true}, then
         *      {@link MoveDecision#targetNode} will return a non-null value, otherwise the assignedNodeId will be null.
         *   4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then
         *      {@link MoveDecision#nodeDecisions} will have a non-null value.
         */
        public MoveDecision decideMove(final ShardRouting shardRouting) {
            if (shardRouting.started() == false) {
                // we can only move started shards
                return MoveDecision.NOT_TAKEN;
            }

            final boolean explain = allocation.debugDecision();
            final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
            assert sourceNode != null && sourceNode.containsShard(shardRouting);
            RoutingNode routingNode = sourceNode.getRoutingNode();
            Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
            if (canRemain.type() != Decision.Type.NO) {
                return MoveDecision.stay(canRemain);
            }

            sorter.reset(shardRouting.getIndexName());
            /*
             * the sorter holds the minimum weight node first for the shards index.
             * We now walk through the nodes until we find a node to allocate the shard.
             * This is not guaranteed to be balanced after this operation we still try best effort to
             * allocate on the minimal eligible node.
             */
            Type bestDecision = Type.NO;
            RoutingNode targetNode = null;
            final List<NodeAllocationResult> nodeExplanationMap = explain ? new ArrayList<>() : null;
            int weightRanking = 0;
            for (ModelNode currentNode : sorter.modelNodes) {
                if (currentNode != sourceNode) {
                    RoutingNode target = currentNode.getRoutingNode();
                    // don't use canRebalance as we want hard filtering rules to apply. See #17698
                    Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
                    if (explain) {
                        nodeExplanationMap.add(new NodeAllocationResult(
                            currentNode.getRoutingNode().node(), allocationDecision, ++weightRanking));
                    }
                    // TODO maybe we can respect throttling here too?
                    if (allocationDecision.type().higherThan(bestDecision)) {
                        bestDecision = allocationDecision.type();
                        if (bestDecision == Type.YES) {
                            targetNode = target;
                            if (explain == false) {
                                // we are not in explain mode and already have a YES decision on the best weighted node,
                                // no need to continue iterating
                                break;
                            }
                        }
                    }
                }
            }

            return MoveDecision.cannotRemain(canRemain, AllocationDecision.fromDecisionType(bestDecision),
                targetNode != null ? targetNode.node() : null, nodeExplanationMap);
        }

可见,moveShard过程会遍历每一个跨机器的shard,判断此shard是否需要迁移,来实现shard在节点间的均衡。

判断每一个shard是否迁移的方式是:

  1. 如果shard未开始,跳过,设置标识 MoveDecision.isDecisionTaken = false
  2. 如果shard可以落在当前节点上,跳过,设置标识 MoveDecision.canRemainDecision = true
  3. 如果shard不可以落在当前节点上,迁移,设置标识 MoveDecision.getAllocationDecision() 为迁移的详细内容; 如果MoveDecision.forceMove() = true,则 MoveDecision.targetNode 必须为一个非null值.
  4. 如果运行在了explain模式,则MoveDecision.nodeDecisions为非null值.

解决方法

参考shard allocation官方文档,得到如下方法:

  1. 关闭磁盘检查,这里自己想办法监控磁盘。
    curl -XPUT "localhost:9200/_cluster/settings?timeout=600s" -d '{
        "persistent" : {
            "cluster.routing.allocation.disk.threshold_enabled" : false
        }
    }'

  1. 提高并发数,这里提高了5倍。
    curl -XPUT "localhost:9200/_cluster/settings?timeout=600s" -d '{
        "persistent" : {
            "cluster.routing.allocation.node_concurrent_incoming_recoveries" : 10,
			"cluster.routing.allocation.node_concurrent_outgoing_recoveries": 10,
			"cluster.routing.allocation.node_concurrent_recoveries": 20,
			"cluster.routing.allocation.node_initial_primaries_recoveries": 20        }
    }'
  1. 停止rebalance,根据场景自己设置,这里禁用了。
    curl -XPUT "localhost:9200/_cluster/settings?timeout=600s" -d '{
        "persistent" : {
            "cluster.routing.rebalance.enable" : "none",
			"cluster.routing.allocation.allow_rebalance": "indices_all_active",
			"cluster.routing.allocation.cluster_concurrent_rebalance": 20
		}
    }'

效果

建立索引的速度快了一倍,但是还有进一步优化的空间。

进一步调查

  • org.elasticsearch.cluster.service.ClusterService.runTasks (8250 ms, 100%)
  • org.elasticsearch.cluster.service.ClusterService.calculateTaskOutputs (62%)
    • org.elasticsearch.cluster.service.ClusterService.executeTasks (61%)
    • org.elasticsearch.cluster.ClusterStateTaskExecutor.execute (61%)
      • org.elasticsearch.cluster.ClusterStateTaskExecutor.execute (61%)
      • org.elasticsearch.cluster.action.shard.ShardStateAction.ShardStartedClusterStateTaskExecutor.execute (31%)
        • org.elasticsearch.cluster.routing.allocation.AllocationService.applyStartedShards (31%, 启用shard, 可以并行么? )
        • org.elasticsearch.cluster.routing.allocation.AllocationService.reroute (20%)
          • org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator.allocate (36% 多处调用合并)
          • org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.allocate
            • org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.moveShards (31%)
            • org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.decideMove (25%)
              • org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders.canRemain (22%)
              • org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider.canRemain (15%)
                • org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.canRemain
                • org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.doDecide (7%)
                  • org.elasticsearch.common.settings.Setting.get (4%)
                  • org.elasticsearch.cluster.metadata.MetaData.getIndexSafe (1%)
                • org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.canRemain
                • org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.shouldFilter (4%)
                • org.elasticsearch.cluster.routing.allocation.MoveDecision.canRemain
                • org.elasticsearch.cluster.routing.allocation.decider.Decision.type (2%)
                • org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.canRemain
                • org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.underCapacity (1%)
                • org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.canRemain
                • org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.earlyTerminate (1%)
                • org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders.canRemain
                • org.elasticsearch.cluster.routing.allocation.decider.Decision.Multi.add (1%)
              • org.elasticsearch.cluster.routing.allocation.decider.Decision.type (1%)
            • org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.access (3%)
        • org.elasticsearch.cluster.routing.allocation.AllocationService.buildResultAndLogHealthChange (10%)
          • org.elasticsearch.cluster.routing.RoutingTable.Builder.updateNodes (7%)
          • org.elasticsearch.cluster.routing.IndexRoutingTable.Builder.addShard (4%)
            • org.elasticsearch.cluster.routing.IndexShardRoutingTable.Builder.build (3%)
          • org.elasticsearch.cluster.routing.RoutingTable.Builder.add (2%)
          • org.elasticsearch.cluster.routing.allocation.RoutingAllocation.updateMetaDataWithRoutingChanges (1%)
        • org.elasticsearch.cluster.routing.allocation.AllocationService.getMutableRoutingNodes (1%)
      • org.elasticsearch.cluster.ClusterStateUpdateTask.execute (30%)
        • org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.1.execute (29%, 创建索引)
        • org.elasticsearch.cluster.routing.allocation.AllocationService.reroute (29%)
          • org.elasticsearch.cluster.routing.allocation.AllocationService.reroute (17%)
          • org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator.allocate (36% 与上面合并)
          • org.elasticsearch.cluster.routing.allocation.AllocationService.buildResultAndLogHealthChange (11%)
          • org.elasticsearch.cluster.routing.RoutingTable.Builder.updateNodes (7%)
            • org.elasticsearch.cluster.routing.IndexRoutingTable.Builder.addShard (4%)
            • org.elasticsearch.cluster.routing.IndexShardRoutingTable.Builder.build (3%)
            • org.elasticsearch.cluster.routing.RoutingTable.Builder.add (2%)
          • org.elasticsearch.cluster.routing.allocation.RoutingAllocation.updateMetaDataWithRoutingChanges (1%)
          • org.elasticsearch.cluster.routing.allocation.AllocationService.getMutableRoutingNodes (1%)
  • org.elasticsearch.cluster.service.ClusterService.publishAndApplyChanges (38%, taskInputs中如何一次多个task?)
    • java.util.function.BiConsumer.accept (21%)
    • org.elasticsearch.cluster.service.ClusterService.callClusterStateAppliers (9%)
    • org.elasticsearch.cluster.ClusterStateApplier.applyClusterState (9%)
      • org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState
      • org.elasticsearch.indices.cluster.IndicesClusterStateService.updateFailedShardsCache (3%)
        • org.elasticsearch.cluster.ClusterState.getRoutingNodes (3%)
      • org.elasticsearch.indices.cluster.IndicesClusterStateService.createOrUpdateShards (2%)
      • org.elasticsearch.indices.cluster.IndicesClusterStateService.removeShards (1%)
      • org.elasticsearch.indices.cluster.IndicesClusterStateService.updateIndices (1%)
      • org.elasticsearch.indices.cluster.IndicesClusterStateService.failMissingShards (1%)
      • org.elasticsearch.gateway.Gateway.applyClusterState
      • org.elasticsearch.gateway.GatewayMetaState.applyClusterState (1%)
      • org.elasticsearch.gateway.GatewayMetaState.applyClusterState
      • org.elasticsearch.gateway.GatewayMetaState.resolveStatesToBeWritten (1%)
    • java.util.stream.Stream.forEach (8%)
    • org.elasticsearch.cluster.service.ClusterService.lambda$publishAndApplyChanges$7
      • org.elasticsearch.cluster.ClusterStateListener.clusterChanged (8%)
      • org.elasticsearch.gateway.DanglingIndicesState.clusterChanged
        • org.elasticsearch.gateway.DanglingIndicesState.processDanglingIndices (2%)
      • org.elasticsearch.cluster.routing.DelayedAllocationService.clusterChanged
        • org.elasticsearch.cluster.routing.DelayedAllocationService.scheduleIfNeeded (2%)
      • org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService.clusterChanged
        • org.elasticsearch.common.settings.Setting.get (1%)

初步猜测,

解决方法

效果

评论

发表评论

validate