diff --git a/gpbft/errors.go b/gpbft/errors.go index 3ec3dff7..5b37a7e6 100644 --- a/gpbft/errors.go +++ b/gpbft/errors.go @@ -25,6 +25,9 @@ var ( // // See SupplementalData. ErrValidationWrongSupplement = newValidationError("unexpected supplemental data") + // ErrValidationNotRelevant signals that a message is valid but not relevant at the current instance, + // and is not worth propagating to others. + ErrValidationNotRelevant = newValidationError("message is valid but not relevant") // ErrReceivedWrongInstance signals that a message is received with mismatching instance ID. ErrReceivedWrongInstance = errors.New("received message for wrong instance") diff --git a/gpbft/errors_test.go b/gpbft/errors_test.go index fc8ff105..d37ea582 100644 --- a/gpbft/errors_test.go +++ b/gpbft/errors_test.go @@ -17,6 +17,7 @@ func TestValidationError_SentinelValues(t *testing.T) { {name: "ErrValidationInvalid", subject: ErrValidationInvalid}, {name: "ErrValidationWrongBase", subject: ErrValidationWrongBase}, {name: "ErrValidationWrongSupplement", subject: ErrValidationWrongSupplement}, + {name: "ErrValidationNotRelevant", subject: ErrValidationNotRelevant}, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { diff --git a/gpbft/gpbft.go b/gpbft/gpbft.go index c024d8ba..3d65da33 100644 --- a/gpbft/gpbft.go +++ b/gpbft/gpbft.go @@ -9,6 +9,7 @@ import ( "math" "slices" "sort" + "sync" "time" "github.com/filecoin-project/go-bitfield" @@ -158,9 +159,13 @@ type instance struct { powerTable PowerTable // The beacon value from the base chain, used for tickets in this instance. beacon []byte - // Current round number. + // roundPhaseMutex guards access to round and phase. + roundPhaseMutex sync.RWMutex + // round represents the current round number. This field must not be accessed + // directly. See: getCurrentRound, setCurrentRound, IncrementAndGetCurrentRound. round uint64 - // Current phase in the round. + // phase represents the current phase in the round. This field must not be + // accessed directly. See: getCurrentPhase, setCurrentPhase. phase Phase // Time at which the current phase can or must end. // For QUALITY, PREPARE, and COMMIT, this is the latest time (the phase can end sooner). @@ -222,19 +227,12 @@ func newInstance( if input.IsZero() { return nil, fmt.Errorf("input is empty") } - metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrInitialPhase)) - metrics.currentInstance.Record(context.TODO(), int64(instanceID)) - metrics.currentPhase.Record(context.TODO(), int64(INITIAL_PHASE)) - metrics.currentRound.Record(context.TODO(), 0) - - return &instance{ + i := instance{ participant: participant, instanceID: instanceID, input: input, powerTable: powerTable, beacon: beacon, - round: 0, - phase: INITIAL_PHASE, supplementalData: data, proposal: input, value: ECChain{}, @@ -247,7 +245,16 @@ func newInstance( }, decision: newQuorumState(powerTable), tracer: participant.tracer, - }, nil + } + i.setCurrentRound(0) + i.setCurrentPhase(INITIAL_PHASE) + + metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrInitialPhase)) + metrics.currentInstance.Record(context.TODO(), int64(instanceID)) + metrics.currentPhase.Record(context.TODO(), int64(INITIAL_PHASE)) + metrics.currentRound.Record(context.TODO(), 0) + + return &i, nil } type roundState struct { @@ -327,7 +334,7 @@ func (i *instance) ReceiveAlarm() error { } func (i *instance) Describe() string { - return fmt.Sprintf("{%d}, round %d, phase %s", i.instanceID, i.round, i.phase) + return fmt.Sprintf("{%d}, round %d, phase %s", i.instanceID, i.getCurrentRound(), i.getCurrentPhase()) } // Processes a single message. @@ -350,11 +357,12 @@ func (i *instance) receiveOne(msg *GMessage) (bool, error) { ErrValidationWrongBase, &msg.Vote.Value, i.input.Base()) } - if i.phase == TERMINATED_PHASE { + phase := i.getCurrentPhase() + if phase == TERMINATED_PHASE { return false, nil // No-op } // Ignore CONVERGE and PREPARE messages for prior rounds. - forPriorRound := msg.Vote.Round < i.round + forPriorRound := msg.Vote.Round < i.getCurrentRound() if (forPriorRound && msg.Vote.Phase == CONVERGE_PHASE) || (forPriorRound && msg.Vote.Phase == PREPARE_PHASE) { return false, nil @@ -363,7 +371,7 @@ func (i *instance) receiveOne(msg *GMessage) (bool, error) { // Drop message that: // * belong to future rounds, beyond the configured max lookahead threshold, and // * carry no justification, i.e. are spammable. - beyondMaxLookaheadRounds := msg.Vote.Round > i.round+i.participant.maxLookaheadRounds + beyondMaxLookaheadRounds := msg.Vote.Round > i.getCurrentRound()+i.participant.maxLookaheadRounds if beyondMaxLookaheadRounds && isSpammable(msg) { return false, nil } @@ -378,7 +386,7 @@ func (i *instance) receiveOne(msg *GMessage) (bool, error) { i.quality.ReceiveEachPrefix(msg.Sender, msg.Vote.Value) // If the instance has surpassed QUALITY phase, update the candidates based // on possible quorum of input prefixes. - if i.phase != QUALITY_PHASE { + if phase != QUALITY_PHASE { return true, i.updateCandidatesFromQuality() } case CONVERGE_PHASE: @@ -399,12 +407,12 @@ func (i *instance) receiveOne(msg *GMessage) (bool, error) { // to a new round. Late-arriving COMMITs can still (must) cause a local decision, // *in that round*. Try to complete the COMMIT phase for the round specified by // the message. - if i.phase != DECIDE_PHASE { + if phase != DECIDE_PHASE { return true, i.tryCommit(msg.Vote.Round) } case DECIDE_PHASE: i.decision.Receive(msg.Sender, msg.Vote.Value, msg.Signature) - if i.phase != DECIDE_PHASE { + if phase != DECIDE_PHASE { i.skipToDecide(msg.Vote.Value, msg.Justification) } default: @@ -435,7 +443,7 @@ func (i *instance) postReceive(roundsReceived ...uint64) { func (i *instance) shouldSkipToRound(round uint64, state *roundState) (ECChain, *Justification, bool) { // Check if the given round is ahead of current round and this instance is not in // DECIDE phase. - if round <= i.round || i.phase == DECIDE_PHASE { + if round <= i.getCurrentRound() || i.getCurrentPhase() == DECIDE_PHASE { return nil, nil, false } if !state.prepared.ReceivedFromWeakQuorum() { @@ -453,8 +461,9 @@ func (i *instance) shouldSkipToRound(round uint64, state *roundState) (ECChain, // Attempts to complete the current phase and round. func (i *instance) tryCurrentPhase() error { - i.log("try phase %s", i.phase) - switch i.phase { + phase := i.getCurrentPhase() + i.log("try phase %s", phase) + switch phase { case QUALITY_PHASE: return i.tryQuality() case CONVERGE_PHASE: @@ -462,26 +471,26 @@ func (i *instance) tryCurrentPhase() error { case PREPARE_PHASE: return i.tryPrepare() case COMMIT_PHASE: - return i.tryCommit(i.round) + return i.tryCommit(i.getCurrentRound()) case DECIDE_PHASE: return i.tryDecide() case TERMINATED_PHASE: return nil // No-op default: - return fmt.Errorf("unexpected phase %s", i.phase) + return fmt.Errorf("unexpected phase %s", phase) } } // Sends this node's QUALITY message and begins the QUALITY phase. func (i *instance) beginQuality() error { - if i.phase != INITIAL_PHASE { - return fmt.Errorf("cannot transition from %s to %s", i.phase, QUALITY_PHASE) + if phase := i.getCurrentPhase(); phase != INITIAL_PHASE { + return fmt.Errorf("cannot transition from %s to %s", phase, QUALITY_PHASE) } // Broadcast input value and wait to receive from others. - i.phase = QUALITY_PHASE + i.setCurrentPhase(QUALITY_PHASE) i.phaseTimeout = i.alarmAfterSynchrony() i.resetRebroadcastParams() - i.broadcast(i.round, QUALITY_PHASE, i.proposal, false, nil) + i.broadcast(i.getCurrentRound(), QUALITY_PHASE, i.proposal, false, nil) metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrQualityPhase)) metrics.currentPhase.Record(context.TODO(), int64(QUALITY_PHASE)) return nil @@ -489,8 +498,8 @@ func (i *instance) beginQuality() error { // Attempts to end the QUALITY phase and begin PREPARE based on current state. func (i *instance) tryQuality() error { - if i.phase != QUALITY_PHASE { - return fmt.Errorf("unexpected phase %s, expected %s", i.phase, QUALITY_PHASE) + if phase := i.getCurrentPhase(); phase != QUALITY_PHASE { + return fmt.Errorf("unexpected phase %s, expected %s", phase, QUALITY_PHASE) } // Wait either for a strong quorum that agree on our proposal, or for the timeout @@ -527,36 +536,38 @@ func (i *instance) updateCandidatesFromQuality() error { // beginConverge initiates CONVERGE_PHASE justified by the given justification. func (i *instance) beginConverge(justification *Justification) { - if justification.Vote.Round != i.round-1 { + round := i.getCurrentRound() + if justification.Vote.Round != round-1 { // For safety assert that the justification given belongs to the right round. panic("justification for which to begin converge does not belong to expected round") } - i.phase = CONVERGE_PHASE + i.setCurrentPhase(CONVERGE_PHASE) i.phaseTimeout = i.alarmAfterSynchrony() i.resetRebroadcastParams() // Notify the round's convergeState that the self participant has begun the // CONVERGE phase. Because, we cannot guarantee that the CONVERGE message // broadcasts are delivered to self synchronously. - i.getRound(i.round).converged.SetSelfValue(i.proposal, justification) + i.getRound(round).converged.SetSelfValue(i.proposal, justification) - i.broadcast(i.round, CONVERGE_PHASE, i.proposal, true, justification) + i.broadcast(round, CONVERGE_PHASE, i.proposal, true, justification) metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrConvergePhase)) metrics.currentPhase.Record(context.TODO(), int64(CONVERGE_PHASE)) } // Attempts to end the CONVERGE phase and begin PREPARE based on current state. func (i *instance) tryConverge() error { - if i.phase != CONVERGE_PHASE { - return fmt.Errorf("unexpected phase %s, expected %s", i.phase, CONVERGE_PHASE) + if phase := i.getCurrentPhase(); phase != CONVERGE_PHASE { + return fmt.Errorf("unexpected phase %s, expected %s", phase, CONVERGE_PHASE) } // The CONVERGE phase timeout doesn't wait to hear from >⅔ of power. timeoutExpired := atOrAfter(i.participant.host.Time(), i.phaseTimeout) if !timeoutExpired { return nil } - commitRoundState := i.getRound(i.round - 1).committed + round := i.getCurrentRound() + commitRoundState := i.getRound(round - 1).committed isValidConvergeValue := func(cv ConvergeValue) bool { // If it is in candidate set @@ -572,7 +583,7 @@ func (i *instance) tryConverge() error { return possibleDecision } - winner := i.getRound(i.round).converged.FindBestTicketProposal(isValidConvergeValue) + winner := i.getRound(round).converged.FindBestTicketProposal(isValidConvergeValue) if !winner.IsValid() { return fmt.Errorf("no values at CONVERGE") } @@ -594,22 +605,23 @@ func (i *instance) tryConverge() error { // Sends this node's PREPARE message and begins the PREPARE phase. func (i *instance) beginPrepare(justification *Justification) { // Broadcast preparation of value and wait for everyone to respond. - i.phase = PREPARE_PHASE + i.setCurrentPhase(PREPARE_PHASE) i.phaseTimeout = i.alarmAfterSynchrony() i.resetRebroadcastParams() - i.broadcast(i.round, PREPARE_PHASE, i.value, false, justification) + i.broadcast(i.getCurrentRound(), PREPARE_PHASE, i.value, false, justification) metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrPreparePhase)) metrics.currentPhase.Record(context.TODO(), int64(PREPARE_PHASE)) } // Attempts to end the PREPARE phase and begin COMMIT based on current state. func (i *instance) tryPrepare() error { - if i.phase != PREPARE_PHASE { - return fmt.Errorf("unexpected phase %s, expected %s", i.phase, PREPARE_PHASE) + + if phase := i.getCurrentPhase(); phase != PREPARE_PHASE { + return fmt.Errorf("unexpected phase %s, expected %s", phase, PREPARE_PHASE) } - prepared := i.getRound(i.round).prepared + prepared := i.getRound(i.getCurrentRound()).prepared proposalKey := i.proposal.Key() foundQuorum := prepared.HasStrongQuorumFor(proposalKey) timedOut := atOrAfter(i.participant.host.Time(), i.phaseTimeout) @@ -626,7 +638,7 @@ func (i *instance) tryPrepare() error { i.beginCommit() } else if timedOut { if err := i.tryRebroadcast(); err != nil { - return fmt.Errorf("failed to rebroadcast at %s phase: %w", i.phase, err) + return fmt.Errorf("failed to rebroadcast at %s phase: %w", i.getCurrentPhase(), err) } } @@ -634,7 +646,8 @@ func (i *instance) tryPrepare() error { } func (i *instance) beginCommit() { - i.phase = COMMIT_PHASE + round := i.getCurrentRound() + i.setCurrentPhase(COMMIT_PHASE) i.phaseTimeout = i.alarmAfterSynchrony() i.resetRebroadcastParams() @@ -643,15 +656,15 @@ func (i *instance) beginCommit() { // No justification is required for committing bottom. var justification *Justification if !i.value.IsZero() { - if quorum, ok := i.getRound(i.round).prepared.FindStrongQuorumFor(i.value.Key()); ok { + if quorum, ok := i.getRound(round).prepared.FindStrongQuorumFor(i.value.Key()); ok { // Found a strong quorum of PREPARE, build the justification for it. - justification = i.buildJustification(quorum, i.round, PREPARE_PHASE, i.value) + justification = i.buildJustification(quorum, round, PREPARE_PHASE, i.value) } else { panic("beginCommit with no strong quorum for non-bottom value") } } - i.broadcast(i.round, COMMIT_PHASE, i.value, false, justification) + i.broadcast(round, COMMIT_PHASE, i.value, false, justification) metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrCommitPhase)) metrics.currentPhase.Record(context.TODO(), int64(COMMIT_PHASE)) } @@ -673,7 +686,7 @@ func (i *instance) tryCommit(round uint64) error { // influencing that decision against their interest, just accepting it. i.value = quorumValue i.beginDecide(round) - case i.round != round, i.phase != COMMIT_PHASE: + case i.getCurrentRound() != round, i.getCurrentPhase() != COMMIT_PHASE: // We are at a phase other than COMMIT or round does not match the current one; // nothing else to do. case foundStrongQuorum: @@ -703,14 +716,14 @@ func (i *instance) tryCommit(round uint64) error { case timedOut: // The phase has timed out. Attempt to re-broadcast messages. if err := i.tryRebroadcast(); err != nil { - return fmt.Errorf("failed to rebroadcast at %s phase: %w", i.phase, err) + return fmt.Errorf("failed to rebroadcast at %s phase: %w", i.getCurrentPhase(), err) } } return nil } func (i *instance) beginDecide(round uint64) { - i.phase = DECIDE_PHASE + i.setCurrentPhase(DECIDE_PHASE) i.resetRebroadcastParams() var justification *Justification // Value cannot be empty here. @@ -735,7 +748,7 @@ func (i *instance) beginDecide(round uint64) { // without waiting for a strong quorum of COMMITs in any round. // The provided justification must justify the value being decided. func (i *instance) skipToDecide(value ECChain, justification *Justification) { - i.phase = DECIDE_PHASE + i.setCurrentPhase(DECIDE_PHASE) i.proposal = value i.value = i.proposal i.resetRebroadcastParams() @@ -756,13 +769,46 @@ func (i *instance) tryDecide() error { } } else { if err := i.tryRebroadcast(); err != nil { - return fmt.Errorf("failed to rebroadcast at %s phase: %w", i.phase, err) + return fmt.Errorf("failed to rebroadcast at %s phase: %w", i.getCurrentPhase(), err) } } return nil } +func (i *instance) getCurrentRound() uint64 { + i.roundPhaseMutex.RLock() + defer i.roundPhaseMutex.RUnlock() + return i.round +} + +func (i *instance) setCurrentRound(r uint64) { + i.roundPhaseMutex.Lock() + defer i.roundPhaseMutex.Unlock() + i.round = r +} + +// IncrementAndGetCurrentRound increments the current round by one amount and returns +// the resulting round. +func (i *instance) IncrementAndGetCurrentRound() uint64 { + i.roundPhaseMutex.Lock() + defer i.roundPhaseMutex.Unlock() + i.round += 1 + return i.round +} + +func (i *instance) getCurrentPhase() Phase { + i.roundPhaseMutex.RLock() + defer i.roundPhaseMutex.RUnlock() + return i.phase +} + +func (i *instance) setCurrentPhase(p Phase) { + i.roundPhaseMutex.Lock() + defer i.roundPhaseMutex.Unlock() + i.phase = p +} + func (i *instance) getRound(r uint64) *roundState { round, ok := i.rounds[r] if !ok { @@ -773,18 +819,18 @@ func (i *instance) getRound(r uint64) *roundState { } func (i *instance) beginNextRound() { - i.log("moving to round %d with %s", i.round+1, i.proposal.String()) - i.round += 1 - metrics.currentRound.Record(context.TODO(), int64(i.round)) + nextRound := i.IncrementAndGetCurrentRound() + i.log("moving to round %d with %s", nextRound, i.proposal.String()) + metrics.currentRound.Record(context.TODO(), int64(nextRound)) - prevRound := i.getRound(i.round - 1) + prevRound := i.getRound(nextRound - 1) // Proposal was updated at the end of COMMIT phase to be some value for which // this node received a COMMIT message (bearing justification), if there were any. // If there were none, there must have been a strong quorum for bottom instead. var justification *Justification if quorum, ok := prevRound.committed.FindStrongQuorumFor(""); ok { // Build justification for strong quorum of COMMITs for bottom in the previous round. - justification = i.buildJustification(quorum, i.round-1, COMMIT_PHASE, ECChain{}) + justification = i.buildJustification(quorum, nextRound-1, COMMIT_PHASE, ECChain{}) } else { // Extract the justification received from some participant (possibly this node itself). justification, ok = prevRound.committed.receivedJustification[i.proposal.Key()] @@ -800,13 +846,13 @@ func (i *instance) beginNextRound() { // // See shouldSkipToRound. func (i *instance) skipToRound(round uint64, chain ECChain, justification *Justification) { - i.log("skipping from round %d to round %d with %s", i.round, round, i.proposal.String()) - i.round = round - metrics.currentRound.Record(context.TODO(), int64(i.round)) + i.log("skipping from round %d to round %d with %s", i.getCurrentRound(), round, i.proposal.String()) + i.setCurrentRound(round) + metrics.currentRound.Record(context.TODO(), int64(round)) metrics.skipCounter.Add(context.TODO(), 1, metric.WithAttributes(attrSkipToRound)) if justification.Vote.Phase == PREPARE_PHASE { - i.log("⚠️ swaying from %s to %s by skip to round %d", &i.proposal, chain, i.round) + i.log("⚠️ swaying from %s to %s by skip to round %d", &i.proposal, chain, round) i.addCandidate(chain) i.proposal = chain } @@ -838,18 +884,19 @@ func (i *instance) addCandidate(c ECChain) bool { } func (i *instance) terminate(decision *Justification) { - i.log("✅ terminated %s during round %d", &i.value, i.round) - i.phase = TERMINATED_PHASE + round := i.getCurrentRound() + i.log("✅ terminated %s during round %d", &i.value, round) + i.setCurrentPhase(TERMINATED_PHASE) i.value = decision.Vote.Value i.terminationValue = decision i.resetRebroadcastParams() metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrTerminatedPhase)) - metrics.roundHistogram.Record(context.TODO(), int64(i.round)) + metrics.roundHistogram.Record(context.TODO(), int64(round)) metrics.currentPhase.Record(context.TODO(), int64(TERMINATED_PHASE)) } func (i *instance) terminated() bool { - return i.phase == TERMINATED_PHASE + return i.getCurrentPhase() == TERMINATED_PHASE } func (i *instance) broadcast(round uint64, phase Phase, value ECChain, createTicket bool, justification *Justification) { @@ -895,7 +942,7 @@ func (i *instance) tryRebroadcast() error { // not have any phase timeout and may be too far in the past. // * Otherwise, use the phase timeout. var rebroadcastTimeoutOffset time.Time - if i.phase == DECIDE_PHASE { + if i.getCurrentPhase() == DECIDE_PHASE { rebroadcastTimeoutOffset = i.participant.host.Time() } else { rebroadcastTimeoutOffset = i.phaseTimeout @@ -978,7 +1025,7 @@ func (i *instance) rebroadcast() error { // Returns the absolute time at which the alarm will fire. func (i *instance) alarmAfterSynchrony() time.Time { delta := time.Duration(float64(i.participant.delta) * - math.Pow(i.participant.deltaBackOffExponent, float64(i.round))) + math.Pow(i.participant.deltaBackOffExponent, float64(i.getCurrentRound()))) timeout := i.participant.host.Time().Add(2 * delta) i.participant.host.SetAlarm(timeout) return timeout @@ -1007,7 +1054,7 @@ func (i *instance) log(format string, args ...any) { if i.tracer != nil { msg := fmt.Sprintf(format, args...) i.tracer.Log("{%d}: %s (round %d, phase %s, proposal %s, value %s)", i.instanceID, msg, - i.round, i.phase, &i.proposal, &i.value) + i.getCurrentRound(), i.getCurrentPhase(), &i.proposal, &i.value) } } diff --git a/gpbft/gpbft_test.go b/gpbft/gpbft_test.go index 4dd9d5f1..506eb18d 100644 --- a/gpbft/gpbft_test.go +++ b/gpbft/gpbft_test.go @@ -1609,7 +1609,8 @@ func TestGPBFT_DropOld(t *testing.T) { } driver.RequireDeliverMessage(newQuality) driver.RequireDeliverMessage(newDecide0) - driver.RequireDeliverMessage(newCommit0) // no quorum of decides, should still accept it + // No quorum of decides, should still accept it but be considered not relevant + driver.RequireErrOnDeliverMessage(newCommit0, gpbft.ErrValidationNotRelevant, "not relevant") driver.RequireDeliverMessage(newDecide1) // Once we've received two decides, we should reject messages from the "new" instance. diff --git a/gpbft/participant.go b/gpbft/participant.go index 07a1d8ee..d37a008f 100644 --- a/gpbft/participant.go +++ b/gpbft/participant.go @@ -33,7 +33,7 @@ type Participant struct { // if both are to be taken. apiMutex sync.Mutex // Mutex protecting currentInstance and committees cache for concurrent validation. - // Note that not every access need be protected: + // Note that not every access needs to be protected: // - writes to currentInstance, and reads from it during validation, // - reads from or writes to committees (which is written during validation). instanceMutex sync.Mutex @@ -41,8 +41,13 @@ type Participant struct { currentInstance uint64 // Cache of committees for the current or future instances. committees map[uint64]*committee - - // Current Granite instance. + // gpbftMutex protects concurrent access to gpbft. Note that only the following + // access is protected: + // - beginInstance called via alarm to start a new GPBFT instance. + // - calls to isMessageRelevant during validation to access current round and phase. + // - calls to finishCurrentInstance to terminate the current GPBFT instance. + gpbftMutex sync.Mutex + // gpbft represents the current GPBFT instance. gpbft *instance // Messages queued for future instances. mqueue *messageQueue @@ -127,10 +132,12 @@ func (p *Participant) CurrentRound() uint64 { panic("concurrent API method invocation") } defer p.apiMutex.Unlock() + p.gpbftMutex.Lock() + defer p.gpbftMutex.Unlock() if p.gpbft == nil { return 0 } - return p.gpbft.round + return p.gpbft.getCurrentRound() } func (p *Participant) CurrentInstance() uint64 { @@ -174,6 +181,9 @@ func (p *Participant) ValidateMessage(msg *GMessage) (valid ValidatedMessage, er } else if alreadyValidated, err := p.validationCache.Contains(msg.Vote.Instance, messageCacheNamespace, buf.Bytes()); err != nil { log.Errorw("failed to check already validated messages", "err", err) } else if alreadyValidated { + if !p.isMessageRelevant(msg) { + return nil, ErrValidationNotRelevant + } metrics.validationCache.Add(context.TODO(), 1, metric.WithAttributes(attrCacheHit, attrCacheKindMessage)) return &validatedMessage{msg: msg}, nil } else { @@ -243,6 +253,11 @@ func (p *Participant) ValidateMessage(msg *GMessage) (valid ValidatedMessage, er return nil, fmt.Errorf("message %v has unexpected justification: %w", msg, ErrValidationInvalid) } + if !p.isMessageRelevant(msg) { + // Message is valid but no longer relevant in the context of progressing GPBFT. + return nil, ErrValidationNotRelevant + } + if cacheMessage { if _, err := p.validationCache.Add(msg.Vote.Instance, messageCacheNamespace, buf.Bytes()); err != nil { log.Warnw("failed to cache to already validated message", "err", err) @@ -366,6 +381,49 @@ func (p *Participant) validateJustification(msg *GMessage, comt *committee) erro return nil } +// isMessageRelevant determines whether a given message is useful in terms of +// aiding the progress of an instance to the best of our knowledge. +func (p *Participant) isMessageRelevant(msg *GMessage) bool { + var currentRound uint64 + var currentPhase Phase + p.gpbftMutex.Lock() + if p.gpbft != nil { + currentRound = p.gpbft.getCurrentRound() + currentPhase = p.gpbft.getCurrentPhase() + } + p.gpbftMutex.Unlock() + + p.instanceMutex.Lock() + defer p.instanceMutex.Unlock() + // Relevant messages are: + // 1. DECIDE messages from previous instance, + // 2. some messages from current instance (pending further checks), or + // 3. any message from future instance. + switch msg.Vote.Instance { + case p.currentInstance - 1: + return msg.Vote.Phase == DECIDE_PHASE + case p.currentInstance: + // Message is from the current round; proceed to check other conditions. + default: + return msg.Vote.Instance > p.currentInstance + } + + // When we are at DECIDE phase the only relevant messages are DECIDE messages. + // Otherwise, relevant messages are either QUALITY, DECIDE, messages from + // previous round, current round or future rounds. + switch { + case currentPhase == DECIDE_PHASE: + return msg.Vote.Phase == DECIDE_PHASE + case msg.Vote.Phase == QUALITY_PHASE, + msg.Vote.Phase == DECIDE_PHASE, + msg.Vote.Round == currentRound-1, // Use explicit case for previous round to avoid unt64 overflow. + msg.Vote.Round >= currentRound: + return true + default: + return false + } +} + // Receives a validated Granite message from some other participant. func (p *Participant) ReceiveMessage(vmsg ValidatedMessage) (err error) { if !p.apiMutex.TryLock() { @@ -407,6 +465,8 @@ func (p *Participant) ReceiveAlarm() (err error) { panic("concurrent API method invocation") } defer p.apiMutex.Unlock() + p.gpbftMutex.Lock() + defer p.gpbftMutex.Unlock() defer func() { if r := recover(); r != nil { err = newPanicError(r) @@ -415,11 +475,12 @@ func (p *Participant) ReceiveAlarm() (err error) { metrics.errorCounter.Add(context.TODO(), 1, metric.WithAttributes(metricAttributeFromError(err))) } }() - if p.gpbft == nil { // The alarm is for fetching the next chain and beginning a new instance. - return p.beginInstance() + err := p.beginInstance() + return err } + if err := p.gpbft.ReceiveAlarm(); err != nil { return fmt.Errorf("failed receiving alarm: %w", err) } @@ -512,10 +573,12 @@ func (p *Participant) handleDecision() { } func (p *Participant) finishCurrentInstance() *Justification { + p.gpbftMutex.Lock() + defer p.gpbftMutex.Unlock() var decision *Justification if p.gpbft != nil { decision = p.gpbft.terminationValue - p.terminatedDuringRound = p.gpbft.round + p.terminatedDuringRound = p.gpbft.getCurrentRound() p.validationCache.RemoveGroup(p.gpbft.instanceID) } p.gpbft = nil @@ -542,7 +605,7 @@ func (p *Participant) beginNextInstance(nextInstance uint64) { } func (p *Participant) terminated() bool { - return p.gpbft != nil && p.gpbft.phase == TERMINATED_PHASE + return p.gpbft != nil && p.gpbft.getCurrentPhase() == TERMINATED_PHASE } func (p *Participant) Describe() string { diff --git a/host.go b/host.go index 1d5bdf87..ab1bf5f3 100644 --- a/host.go +++ b/host.go @@ -333,6 +333,10 @@ func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg case errors.Is(err, gpbft.ErrValidationTooOld): // we got the message too late return pubsub.ValidationIgnore + case errors.Is(err, gpbft.ErrValidationNotRelevant): + // The message is valid but will not effectively aid progress of GPBFT. Ignore it + // to stop its further propagation across the network. + return pubsub.ValidationIgnore case errors.Is(err, gpbft.ErrValidationNoCommittee): log.Debugf("commitee error during validation: %+v", err) return pubsub.ValidationIgnore diff --git a/sim/network.go b/sim/network.go index 618e0faf..c1a50799 100644 --- a/sim/network.go +++ b/sim/network.go @@ -142,24 +142,30 @@ func (n *Network) SetAlarm(sender gpbft.ActorID, at time.Time) { ) } +// HasMoreTicks checks whether there are any messages left to propagate across +// the network participants. See Tick. +func (n *Network) HasMoreTicks() bool { + return n.queue.Len() > 0 +} + // Tick disseminates one message among participants and returns whether there are // any more messages to process. -func (n *Network) Tick(adv *adversary.Adversary) (bool, error) { +func (n *Network) Tick(adv *adversary.Adversary) error { msg := n.queue.Remove() n.clock = msg.deliverAt receiver, found := n.participants[msg.dest] if !found { - return false, fmt.Errorf("message destined to unknown participant ID: %d", msg.dest) + return fmt.Errorf("message destined to unknown participant ID: %d", msg.dest) } switch payload := msg.payload.(type) { case string: if payload != "ALARM" { - return false, fmt.Errorf("unknwon string message payload: %s", payload) + return fmt.Errorf("unknwon string message payload: %s", payload) } n.log(TraceRecvd, "P%d %s", msg.source, payload) if err := receiver.ReceiveAlarm(); err != nil { - return false, fmt.Errorf("failed to deliver alarm from %d to %d: %w", msg.source, msg.dest, err) + return fmt.Errorf("failed to deliver alarm from %d to %d: %w", msg.source, msg.dest, err) } case gpbft.GMessage: // If GST has not elapsed, check if adversary allows the propagation of message. @@ -170,7 +176,7 @@ func (n *Network) Tick(adv *adversary.Adversary) (bool, error) { } else if !adv.AllowMessage(msg.source, msg.dest, payload) { // GST has not passed and adversary blocks the delivery of message; proceed to // next tick. - return n.queue.Len() > 0, nil + return nil } } validated, err := receiver.ValidateMessage(&payload) @@ -179,16 +185,16 @@ func (n *Network) Tick(adv *adversary.Adversary) (bool, error) { // Silently drop old messages. break } - return false, fmt.Errorf("invalid message from %d to %d: %w", msg.source, msg.dest, err) + return fmt.Errorf("invalid message from %d to %d: %w", msg.source, msg.dest, err) } n.log(TraceRecvd, "P%d ← P%d: %v", msg.dest, msg.source, msg.payload) if err := receiver.ReceiveMessage(validated); err != nil { - return false, fmt.Errorf("failed to deliver message from %d to %d: %w", msg.source, msg.dest, err) + return fmt.Errorf("failed to deliver message from %d to %d: %w", msg.source, msg.dest, err) } default: - return false, fmt.Errorf("unknown message payload: %v", payload) + return fmt.Errorf("unknown message payload: %v", payload) } - return n.queue.Len() > 0, nil + return nil } func (n *Network) log(level int, format string, args ...interface{}) { diff --git a/sim/sim.go b/sim/sim.go index 80516435..6e9b0d4a 100644 --- a/sim/sim.go +++ b/sim/sim.go @@ -1,6 +1,7 @@ package sim import ( + "errors" "fmt" "strings" @@ -71,8 +72,7 @@ func (s *Simulation) Run(instanceCount uint64, maxRounds uint64) error { } // Run until there are no more messages, meaning termination or deadlock. - moreTicks := true - for moreTicks { + for s.network.HasMoreTicks() { if err := s.ec.Err(); err != nil { return fmt.Errorf("error in decision: %w", err) } @@ -129,9 +129,15 @@ func (s *Simulation) Run(instanceCount uint64, maxRounds uint64) error { break } } - var err error - moreTicks, err = s.network.Tick(s.adversary) - if err != nil { + + switch err := s.network.Tick(s.adversary); { + case errors.Is(err, gpbft.ErrValidationNotRelevant): + // Ignore error signalling valid messages that are no longer useful for the + // progress of GPBFT. This can occur in normal operation depending on the order + // of delivered messages. In production, deployment this error is used to signal + // that the message does not need to be propagated among participants. In + // simulation, we simply ignore it. + case err != nil: return fmt.Errorf("error performing simulation phase: %w", err) } } diff --git a/test/withhold_test.go b/test/withhold_test.go index 709944ac..fe20ebed 100644 --- a/test/withhold_test.go +++ b/test/withhold_test.go @@ -73,9 +73,11 @@ func TestWitholdCommitAdversary(t *testing.T) { } // The adversary could convince the victim to decide a, so all must decide a. require.NoError(t, err) - decision := sm.GetInstance(0).GetDecision(0) - require.NotNil(t, decision) - require.Equal(t, &a, decision) + for _, victim := range victims { + decision := sm.GetInstance(0).GetDecision(victim) + require.NotNil(t, decision) + require.Equal(t, &a, decision) + } }) } }