% ------------------------------------------------------------------- % % riak_core: Core Riak Application % % Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. % % This file is provided to you under the Apache License, % Version 2.0 (the "License"); you may not use this file % except in compliance with the License. You may obtain % a copy of the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an % "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY % KIND, either express or implied. See the License for the % specific language governing permissions and limitations % under the License. % % ------------------------------------------------------------------- % @doc riak_core_gossip takes care of the mechanics of shuttling a from one % node to another upon request by other Riak processes. % % Additionally, it occasionally checks to make sure the current node has its % fair share of partitions, and also sends a copy of the ring to some other % random node, ensuring that all nodes eventually synchronize on the same % understanding of the Riak cluster. This interval is configurable, but % defaults to once per minute. -module(riak_core_gossip). -behaviour(gen_server). -export([start_link/0, stop/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export ([distribute_ring/1, send_ring/1, send_ring/2, remove_from_cluster/2, claim_until_balanced/2, random_gossip/1, recursive_gossip/1, random_recursive_gossip/1, rejoin/2, gossip_version/0, legacy_gossip/0, legacy_gossip/1]). -include("riak_core_ring.hrl"). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. % Default gossip rate: allow at most 45 gossip messages every 10 seconds -define(DEFAULT_LIMIT, {45, 10000}). -record(state, {gossip_versions, gossip_tokens}). % =================================================================== % Public API % =================================================================== % distribute_ring/1 - % Distribute a ring to all members of that ring. distribute_ring(Ring) -> gen_server:cast({?MODULE, node()}, {distribute_ring, Ring}). % send_ring/1 - % Send the current node's ring to some other node. send_ring(ToNode) -> send_ring(node(), ToNode). % send_ring/2 - % Send the ring from one node to another node. % Does nothing if the two nodes are the same. send_ring(Node, Node) -> ok; send_ring(FromNode, ToNode) -> gen_server:cast({?MODULE, FromNode}, {send_ring_to, ToNode}). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). stop() -> gen_server:cast(?MODULE, stop). rejoin(Node, Ring) -> gen_server:cast({?MODULE, Node}, {rejoin, Ring}). legacy_gossip() -> gen_server:call(?MODULE, legacy_gossip). legacy_gossip(Node) -> gen_server:call(?MODULE, {legacy_gossip, Node}). % @doc Gossip state to a random node in the ring. random_gossip(Ring) -> case riak_core_ring:random_other_active_node(Ring) of no_node -> % must be single node cluster ok; RandomNode -> send_ring(node(), RandomNode) end. % @doc Gossip state to a fixed set of nodes determined from a binary % tree decomposition of the membership state. Recursive gossip % converts the list of node members into a binary tree and % gossips to the given node's right/left children. The gossip % is considered recursive, because each receiving node may also % call recursive_gossip therefore gossiping to their children. % The fan-out therefore expands logarithmically to cover the % entire cluster. recursive_gossip(Ring, Node) -> Nodes = riak_core_ring:active_members(Ring), Tree = riak_core_util:build_tree(2, Nodes, [cycles]), Children = orddict:fetch(Node, Tree), [send_ring(node(), OtherNode) || OtherNode <- Children], ok. recursive_gossip(Ring) -> % A non-active member will not show-up in the tree decomposition % and therefore we fallback to random_recursive_gossip as necessary. Active = riak_core_ring:active_members(Ring), case lists:member(node(), Active) of true -> recursive_gossip(Ring, node()); false -> random_recursive_gossip(Ring) end. random_recursive_gossip(Ring) -> Active = riak_core_ring:active_members(Ring), RNode = lists:nth(random:uniform(length(Active)), Active), recursive_gossip(Ring, RNode). % =================================================================== % gen_server behaviour % =================================================================== % @private init(_State) -> schedule_next_reset(), {ok, Ring} = riak_core_ring_manager:get_raw_ring(), {Tokens, _} = app_helper:get_env(riak_core, gossip_limit, ?DEFAULT_LIMIT), State = update_known_versions(Ring, #state{gossip_versions=orddict:new(), gossip_tokens=Tokens}), {ok, State}. handle_call(legacy_gossip, _From, State) -> {ok, Ring} = riak_core_ring_manager:get_raw_ring(), Reply = check_legacy_gossip(Ring, State), {reply, Reply, State}; handle_call({legacy_gossip, Node}, _From, State) -> {ok, MyRing} = riak_core_ring_manager:get_raw_ring(), State2 = update_known_versions(MyRing, State), Reply = known_legacy_gossip(Node, State2), {reply, Reply, State2}; handle_call(_, _From, State) -> {reply, ok, State}. update_gossip_version(Ring) -> CurrentVsn = riak_core_ring:get_member_meta(Ring, node(), gossip_vsn), DesiredVsn = gossip_version(), case CurrentVsn of DesiredVsn -> Ring; _ -> Ring2 = riak_core_ring:update_member_meta(node(), Ring, node(), gossip_vsn, DesiredVsn), Ring2 end. known_legacy_gossip(Node, State) -> case orddict:find(Node, State#state.gossip_versions) of error -> true; {ok, ?LEGACY_RING_VSN} -> true; _ -> false end. check_legacy_gossip(Ring, State) -> {ok, MyRing} = riak_core_ring_manager:get_raw_ring(), State2 = update_known_versions(MyRing, State), case riak_core_ring:legacy_ring(Ring) of true -> true; false -> % If any member is using legacy gossip, then we use legacy gossip. Members = riak_core_ring:all_members(Ring), Legacy = [known_legacy_gossip(Node, State2) || Node <- Members], Result = lists:any(fun(E) -> E =:= true end, Legacy), Result end. update_known_version(Node, {OtherRing, GVsns}) -> case riak_core_ring:get_member_meta(OtherRing, Node, gossip_vsn) of undefined -> case riak_core_ring:owner_node(OtherRing) of Node -> % Ring owner defaults to legacy gossip if unspecified. {OtherRing, orddict:store(Node, ?LEGACY_RING_VSN, GVsns)}; _ -> {OtherRing, GVsns} end; GossipVsn -> {OtherRing, orddict:store(Node, GossipVsn, GVsns)} end. update_known_versions(OtherRing, State=#state{gossip_versions=GVsns}) -> {_, GVsns2} = lists:foldl(fun update_known_version/2, {OtherRing, GVsns}, riak_core_ring:all_members(OtherRing)), State#state{gossip_versions=GVsns2}. gossip_version() -> case app_helper:get_env(riak_core, legacy_gossip) of true -> ?LEGACY_RING_VSN; _ -> ?CURRENT_RING_VSN end. rpc_gossip_version(Ring, Node) -> GossipVsn = riak_core_ring:get_member_meta(Ring, Node, gossip_vsn), case GossipVsn of undefined -> case rpc:call(Node, riak_core_gossip, gossip_version, [], 1000) of {badrpc, _} -> ?LEGACY_RING_VSN; Vsn -> Vsn end; _ -> GossipVsn end. % @private handle_cast({send_ring_to, _Node}, State=#state{gossip_tokens=0}) -> % Out of gossip tokens, ignore the send request {noreply, State}; handle_cast({send_ring_to, Node}, State) -> {ok, MyRing0} = riak_core_ring_manager:get_raw_ring(), MyRing = update_gossip_version(MyRing0), GossipVsn = case gossip_version() of ?LEGACY_RING_VSN -> ?LEGACY_RING_VSN; _ -> rpc_gossip_version(MyRing, Node) end, RingOut = riak_core_ring:downgrade(GossipVsn, MyRing), riak_core_ring:check_tainted(RingOut, "Error: riak_core_gossip/send_ring_to :: " "Sending tainted ring over gossip"), gen_server:cast({?MODULE, Node}, {reconcile_ring, RingOut}), Tokens = State#state.gossip_tokens - 1, {noreply, State#state{gossip_tokens=Tokens}}; handle_cast({distribute_ring, Ring}, State) -> RingOut = case check_legacy_gossip(Ring, State) of true -> riak_core_ring:downgrade(?LEGACY_RING_VSN, Ring); false -> Ring end, Nodes = riak_core_ring:active_members(Ring), riak_core_ring:check_tainted(RingOut, "Error: riak_core_gossip/distribute_ring :: " "Sending tainted ring over gossip"), gen_server:abcast(Nodes, ?MODULE, {reconcile_ring, RingOut}), {noreply, State}; handle_cast({reconcile_ring, RingIn}, State) -> OtherRing = riak_core_ring:upgrade(RingIn), State2 = update_known_versions(OtherRing, State), case check_legacy_gossip(RingIn, State2) of true -> LegacyRing = riak_core_ring:downgrade(?LEGACY_RING_VSN, OtherRing), riak_core_gossip_legacy:handle_cast({reconcile_ring, LegacyRing}, State2), {noreply, State2}; false -> % Compare the two rings, see if there is anything that % must be done to make them equal... riak_core_stat:update(gossip_received), riak_core_ring_manager:ring_trans(fun reconcile/2, [OtherRing]), {noreply, State2} end; handle_cast(reset_tokens, State) -> schedule_next_reset(), gen_server:cast(?MODULE, gossip_ring), {Tokens, _} = app_helper:get_env(riak_core, gossip_limit, ?DEFAULT_LIMIT), {noreply, State#state{gossip_tokens=Tokens}}; handle_cast(gossip_ring, State) -> % Gossip the ring to some random other node... {ok, MyRing} = riak_core_ring_manager:get_raw_ring(), random_gossip(MyRing), {noreply, State}; handle_cast({rejoin, RingIn}, State) -> OtherRing = riak_core_ring:upgrade(RingIn), {ok, Ring} = riak_core_ring_manager:get_raw_ring(), SameCluster = (riak_core_ring:cluster_name(Ring) =:= riak_core_ring:cluster_name(OtherRing)), case SameCluster of true -> Legacy = check_legacy_gossip(Ring, State), OtherNode = riak_core_ring:owner_node(OtherRing), riak_core:join(Legacy, node(), OtherNode, true), {noreply, State}; false -> {noreply, State} end; handle_cast(_, State) -> {noreply, State}. % @private handle_info(_Info, State) -> {noreply, State}. % @private terminate(_Reason, _State) -> ok. % @private code_change(_OldVsn, State, _Extra) -> {ok, State}. % =================================================================== % Internal functions % =================================================================== schedule_next_reset() -> {_, Reset} = app_helper:get_env(riak_core, gossip_limit, ?DEFAULT_LIMIT), timer:apply_after(Reset, gen_server, cast, [?MODULE, reset_tokens]). reconcile(Ring0, [OtherRing0]) -> % Due to rolling upgrades and legacy gossip, a ring's cluster name % may be temporarily undefined. This is eventually fixed by the claimant. {Ring, OtherRing} = riak_core_ring:reconcile_names(Ring0, OtherRing0), Node = node(), OtherNode = riak_core_ring:owner_node(OtherRing), Members = riak_core_ring:reconcile_members(Ring, OtherRing), WrongCluster = (riak_core_ring:cluster_name(Ring) /= riak_core_ring:cluster_name(OtherRing)), PreStatus = riak_core_ring:member_status(Members, OtherNode), IgnoreGossip = (WrongCluster or (PreStatus =:= invalid) or (PreStatus =:= down)), case IgnoreGossip of true -> Ring2 = Ring, Changed = false; false -> {Changed, Ring2} = riak_core_ring:reconcile(OtherRing, Ring) end, OtherStatus = riak_core_ring:member_status(Ring2, OtherNode), case {WrongCluster, OtherStatus, Changed} of {true, _, _} -> % TODO: Tell other node to stop gossiping to this node. riak_core_stat:update(ignored_gossip), ignore; {_, down, _} -> % Tell other node to rejoin the cluster. riak_core_gossip:rejoin(OtherNode, Ring2), ignore; {_, invalid, _} -> % Exiting/Removed node never saw shutdown cast, re-send. ClusterName = riak_core_ring:cluster_name(Ring), riak_core_ring_manager:refresh_ring(OtherNode, ClusterName), ignore; {_, _, new_ring} -> Ring3 = riak_core_ring:ring_changed(Node, Ring2), riak_core_stat:update(rings_reconciled), log_membership_changes(Ring, Ring3), {reconciled_ring, Ring3}; {_, _, _} -> ignore end. log_membership_changes(OldRing, NewRing) -> OldStatus = orddict:from_list(riak_core_ring:all_member_status(OldRing)), NewStatus = orddict:from_list(riak_core_ring:all_member_status(NewRing)), % Pad both old and new status to the same length OldDummyStatus = [{Node, undefined} || {Node, _} <- NewStatus], OldStatus2 = orddict:merge(fun(_, Status, _) -> Status end, OldStatus, OldDummyStatus), NewDummyStatus = [{Node, undefined} || {Node, _} <- OldStatus], NewStatus2 = orddict:merge(fun(_, Status, _) -> Status end, NewStatus, NewDummyStatus), % Merge again to determine changed status orddict:merge(fun(_, Same, Same) -> Same; (Node, undefined, New) -> lager:info("'~s' joined cluster with status '~s'~n", [Node, New]); (Node, Old, undefined) -> lager:info("'~s' removed from cluster (previously: " "'~s')~n", [Node, Old]); (Node, Old, New) -> lager:info("'~s' changed from '~s' to '~s'~n", [Node, Old, New]) end, OldStatus2, NewStatus2), ok. claim_until_balanced(Ring, Node) -> {WMod, WFun} = app_helper:get_env(riak_core, wants_claim_fun), NeedsIndexes = apply(WMod, WFun, [Ring, Node]), case NeedsIndexes of no -> Ring; {yes, _NumToClaim} -> {CMod, CFun} = app_helper:get_env(riak_core, choose_claim_fun), NewRing = CMod:CFun(Ring, Node), claim_until_balanced(NewRing, Node) end. remove_from_cluster(Ring, ExitingNode) -> % Get a list of indices owned by the ExitingNode... AllOwners = riak_core_ring:all_owners(Ring), % Transfer indexes to other nodes... ExitRing = case attempt_simple_transfer(Ring, AllOwners, ExitingNode) of {ok, NR} -> NR; target_n_fail -> % re-diagonalize % first hand off all claims to *any* one else, % just so rebalance doesn't include exiting node Members = riak_core_ring:claiming_members(Ring), Other = hd(lists:delete(ExitingNode, Members)), TempRing = lists:foldl( fun({I,N}, R) when N == ExitingNode -> riak_core_ring:transfer_node(I, Other, R); (_, R) -> R end, Ring, AllOwners), riak_core_claim:claim_rebalance_n(TempRing, Other) end, ExitRing. attempt_simple_transfer(Ring, Owners, ExitingNode) -> TargetN = app_helper:get_env(riak_core, target_n_val), attempt_simple_transfer(Ring, Owners, TargetN, ExitingNode, 0, [{O,-TargetN} || O <- riak_core_ring:claiming_members(Ring), O /= ExitingNode]). attempt_simple_transfer(Ring, [{P, Exit}|Rest], TargetN, Exit, Idx, Last) -> % handoff case [ N || {N, I} <- Last, Idx-I >= TargetN ] of [] -> target_n_fail; Candidates -> % these nodes don't violate target_n in the reverse direction StepsToNext = fun(Node) -> length(lists:takewhile( fun({_, Owner}) -> Node /= Owner end, Rest)) end, case lists:filter(fun(N) -> Next = StepsToNext(N), (Next+1 >= TargetN) orelse (Next == length(Rest)) end, Candidates) of [] -> target_n_fail; Qualifiers -> % these nodes don't violate target_n forward Chosen = lists:nth(random:uniform(length(Qualifiers)), Qualifiers), % choose one, and do the rest of the ring attempt_simple_transfer( riak_core_ring:transfer_node(P, Chosen, Ring), Rest, TargetN, Exit, Idx+1, lists:keyreplace(Chosen, 1, Last, {Chosen, Idx})) end end; attempt_simple_transfer(Ring, [{_, N}|Rest], TargetN, Exit, Idx, Last) -> % just keep track of seeing this node attempt_simple_transfer(Ring, Rest, TargetN, Exit, Idx+1, lists:keyreplace(N, 1, Last, {N, Idx})); attempt_simple_transfer(Ring, [], _, _, _, _) -> {ok, Ring}.