-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… #24784
Conversation
boolean noNeedRescale = | ||
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() | ||
.map(JobEdge::getDownstreamSubtaskStateMapper) | ||
.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) | ||
&& stateAssignment.executionJobVertex.getInputs().stream() | ||
.map(IntermediateResult::getProducer) | ||
.map(vertexAssignments::get) | ||
.anyMatch( | ||
taskStateAssignment -> { | ||
final int oldParallelism = | ||
stateAssignment | ||
.oldState | ||
.get(stateAssignment.inputOperatorID) | ||
.getParallelism(); | ||
return oldParallelism | ||
== taskStateAssignment.executionJobVertex | ||
.getParallelism(); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain the logic behind this condition? Maybe both of it's parts separately?
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
.map(JobEdge::getDownstreamSubtaskStateMapper)
.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL))
and
stateAssignment.executionJobVertex.getInputs().stream()
.map(IntermediateResult::getProducer)
.map(vertexAssignments::get)
.anyMatch(
taskStateAssignment -> {
final int oldParallelism =
stateAssignment
.oldState
.get(stateAssignment.inputOperatorID)
.getParallelism();
return oldParallelism
== taskStateAssignment.executionJobVertex
.getParallelism();
}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separation is a good idea, ok.
The first part is to check if SubtaskStateMapper.FULL is used in any input (this mapper type always returns all I/O), the problem is reproduced only for this mapper type
The second part is to check if the parallelism of any previous operator has changed. if it has changed, it means that the number of outputs has changed.
But I made little mistake in condition. I will fix it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation.
But I made little mistake in condition. I will fix it
Does this bug have a test coverage? I mean, either was there some test failing or have you added a new test to cover for a future regression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mistake was semantic. It was checked that all the inputs were of FULL type, and it is necessary to have at least one. But since the type of partitioner seems to be the same for all inputs, I don't see how to add a test to check it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe in StateAssignmentOperationTest
create a unit test that has one FULL
and one something else, and assert that the assigned states are as they should be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bumping:
Maybe in StateAssignmentOperationTest create a unit test that has one FULL and one something else, and assert that the assigned states are as they should be?
Does this bug have a test coverage? I mean, either was there some test failing or have you added a new test to cover for a future regression?
sorry to bother you again, but the unit test that you have added still doesn't have test coverage. When I try running your previous version of the code:
boolean noNeedRescale =
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
.map(JobEdge::getDownstreamSubtaskStateMapper)
.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL))
&& stateAssignment.executionJobVertex.getInputs().stream()
.map(IntermediateResult::getProducer)
.map(vertexAssignments::get)
.anyMatch(
taskStateAssignment -> {
final int oldParallelism =
stateAssignment
.oldState
.get(stateAssignment.inputOperatorID)
.getParallelism();
return oldParallelism
== taskStateAssignment.executionJobVertex
.getParallelism();
});
if (inputState.getParallelism() == executionJobVertex.getParallelism() && !noNeedRescale) {
stateAssignment.inputChannelStates.putAll(
toInstanceMap(stateAssignment.inputOperatorID, inputOperatorState));
return;
}
the tests in StateAssignmentOperationTest
are still green.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the analysis. I think you can change a test with incomplete FULL, you are right. I hope I didn't make a mistake with the mistake note last time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Earlier, I misjudged the error of not noticing the negation in the condition.
But by substituting anyMatch
for allMatch
, you can see that the test will stop passing.
Also I changed gateIdx from random generation to always zero (in StateHandleDummyUtil#createNewInputChannelStateHandle
). This change have no affects for other tests. But I need zero for this test, because need stability for the number of input gates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again thanks for the explanation. I've left a couple of more comments.
...ain/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java
Outdated
Show resolved
Hide resolved
|
||
@Rule public TemporaryFolder tempFolder = new TemporaryFolder(); | ||
|
||
private static final File CHECKPOINT_FILE = new File("src/test/resources/custom-checkpoint"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you somehow make the name of the directory more related to the test name? unaligned-checkpoint-custom-partition
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I agree
import static org.junit.Assert.fail; | ||
|
||
/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ | ||
public class UnalignedCheckpointCustomRescaleITCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: UnalignedCheckpointRescaleWithCustomPartitionITCase
?
import static org.junit.Assert.fail; | ||
|
||
/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ | ||
public class UnalignedCheckpointCustomRescaleITCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, instead of creating custom test, can you create a new Topology
in the pre-existing UnalignedCheckpointRescaleITCase
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I though about it, but in UnalignedCheckpointRescaleITCase
used Long
as stream data. But for the regular repeatability of the test it was necessary to use the String
. I will try to add test to UnalignedCheckpointRescaleITCase
, but I'm not sure. Also in UnalignedCheckpointRescaleITCase
rescale full graph (change parallelism for all vertexes), but need to change only one vertex, not all. I have no ideas, how to add new Topology for this test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But for the regular repeatability of the test it was necessary to use the String.
There are plenty of solutions to this problem:
- You are encoding in your
String
record three fields:partition
,index
and somepayload
. But I don't see you using anything but thepartition
, so you could convert your wholeString
record intoLong
record with value just for thepartition
. - You can encode any number of fields in single
Long
just as well. The easiest would be sth like that:
long encode(int partition, int index, int payload) {
checkArgument(partition < 1024);
checkArgument(index < 1024);
checkArgument(payload < Long.MAX_VALUE / (1024*1024));
return partition + index * 1024 + payload * 1024 * 1024;
}
- You could always most likely change the record type for
UnalignedCheckpointRescaleITCase
.String
there should work just as fine, but that's probably more work vs option 1.
but I'm not sure. Also in UnalignedCheckpointRescaleITCase rescale full graph (change parallelism for all vertexes), but need to change only one vertex
When creating job graph in Topology
(UnalignedCheckpointRescaleITCase.Topology#create
), you can set parallelism per vertex and AFAIK that will override the env.setParallelism(...)
setting. So for the downstream vertex/task, that you don't want to change parallelism, you can call:
.addSink(new StringSink(createCheckpoint ? 100 : 1000))
.name("sink")
.setParallelism(3);
While keep the upstream source's parallelism controlled by the environment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An important point for test repeatability is to split the record into two parts (one at the sink input, one at the source output). That's why we use a string with a sufficiently long length and all bytes set to 1 to be sure to read data leading to exception in case of splitting.
That's why need to realize 3rd option. But maybe we can don't change Long to String, but just to add StringSource exactly for CustomPartitioner case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could always most likely change the record type for UnalignedCheckpointRescaleITCase. String there should work just as fine, but that's probably more work vs option 1.
I've just realised that I don't see any place in the UnalignedCheckpointRescaleITCase
that hardcodes the type of the record. You can add a new Topology
(UnalignedCheckpointRescaleITCase.Topology
) that creates any JobGraph, so you can keep your proposed records format. Also I wouldn't mind if you re-used the same LongSource
, but changed the record type from Long
to some POJO of Long
and String payload
public static class Record {
private final long value;
private final @Nullable String payload;
(...)
}
where payload
length/size would be configurable, between 0 (payload == null
) to whatever value you would configured (3713
?).
This way you won't be duplicating all of the setup code and you will leverage the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A difficult way out of the situation, but quite feasible, ok. Thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, but I think this is too complicated a way to implement the test. That's why I came up with an idea that is not as elegant and correct as yours, but is easier to implement and allows you to do without changing less code. Please check it out. Thank you for your patience
Also please note that azure is failing on some compilation issue. Probably some check style violation. |
1aa9484
to
f0f82d2
Compare
Sorry to bother you again, but I made a separate one Pull request with hotfix |
ca6f434
to
b4b928e
Compare
…kpoint with custom partitioner Co-authored-by: Andrey Gaskov <31715230+empathy87@users.noreply.github.com>
I checked the MR. I agree with changes about ITCase. In my case I forget about uid, that's why map vertex doesn't work for me in rescale case. Thanks for changes |
What is the purpose of the change
To fix FLINK-35351
Verifying this change
UnalignedCheckpointRescaleITCase Topology.CUSTOM_PARTITIONER
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: ( no)Documentation