package distributed
- Alphabetic
- By Inheritance
- distributed
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Type Members
- trait DistributedCommunicationNonRoot extends AnyRef
- trait DistributedCommunicationRoot extends AnyRef
- case class LoopState(epoch: Int, lastValidationLoss: Option[Double], minValidationLoss: Option[Double], minValidationEpoch: Option[Int], learningCurve: List[(Int, Double, Option[(Double, Double)])]) extends Product with Serializable
- case class LoopStateWithModelAndOptimizerData(loopState: LoopState, model: Seq[STen], optimizer: Seq[STen], bestModel: Seq[STen]) extends Product with Serializable
Value Members
- def driveDistributedTraining[I, M <: GenericModule[I, Variable], LRState, BatchStreamState, BatchStreamBuffers](nranks: Int, gpu: Int, controlCommunication: DistributedCommunicationRoot, model: SupervisedModel[I, M], optimizerFactory: (Seq[(STen, PTag)]) => Optimizer, trainBatches: () => BatchStream[(I, STen), BatchStreamState, BatchStreamBuffers], validationBatches: () => BatchStream[(I, STen), BatchStreamState, BatchStreamBuffers], maxEpochs: Int, checkpointState: Option[(LoopStateWithModelAndOptimizerData, LRState) => IO[Unit]] = None, validationFrequency: Int = 1, returnMinValidationLossModel: Seq[Int] = Nil, learningRateSchedule: LearningRateSchedule[LRState] = LearningRateSchedule.noop, initState: Option[LoopStateWithModelAndOptimizerData] = None, accumulateGradientOverNBatches: Int = 1, learningRateScheduleInitState: Option[LRState] = None, validationLossExponentialSmoothingFactor: Double = 1.0)(implicit arg0: Load[M]): IO[LoopState]
Drives the distributed training loop.
Drives the distributed training loop.
Must be called on the root rank. If nranks is > 1 then followDistributedTraining must be called on the rest of the ranks.
The batch streams across all ranks must:
- not contain empty batches
- have the same number of batches.
Models across all ranks must have the same shape.
Communication is done by two independent communication channels:
- tensor data is sent via NCCL, thus NCCL's requirement for network setup applies (i.e. single private network if distributed) This method will set up and tear down the NCCL communication clique.
- control messages and initial rendez-vous are using an implementation of DistributedCommunicationRoot and DistributedCommunicationNonRoot. This is a very low traffic channel, 1 message before each epoch. An Akka implementation is provided which is suitable for distributed and single-process multi-gpu settings. A within process cats effect implementation is also provided for single-process multi-gpu settings.
When the training is complete, the best model is copied into the tensors of the supplied in SupervisedModel instance.
- def epochs[LRState](maxEpochs: Int, checkpointState: Option[(LoopState, LRState) => IO[Unit]] = None, validationFrequency: Int = 1, returnMinValidationLossModel: Seq[Int] = Nil, learningRateSchedule: LearningRateSchedule[LRState] = LearningRateSchedule.noop, initState: Option[LoopState] = None, learningRateScheduleInitState: Option[LRState] = None, validationLossExponentialSmoothingFactor: Double = 1.0, trainEpoch: (Double) => IO[Double], validationEpoch: Option[IO[Double]], saveMinValidationLossModel: IO[Unit]): IO[LoopState]
Drives multiple epochs to find the minimum of smoothed validation loss
Drives multiple epochs to find the minimum of smoothed validation loss
This method does not explicitly trains a model but assumes there is a side effecting effectful function which steps through an optimizer through a whole epoch worth of batches.
- checkpointState
Function to checkpoint the state managed in this loop.
- validationFrequency
How often (by epoch count) to calculate the validation loss
- returnMinValidationLossModel
In which epocchs to calculat validation loss
- initState
Initial state of the validation loss management state
- learningRateScheduleInitState
Initial state of the learning rate state
- validationLossExponentialSmoothingFactor
Smoothing factor in exponential smoothing of validation loss <= 1.0
- trainEpoch
An effectful function which steps the optimizer over a complete epoch and returns the training loss
- validationEpoch
An effectful function which steps through in forward mode a complete epoch and returns the validation loss
- returns
The final loop state
- def followDistributedTraining[I, M <: GenericModule[I, Variable], LRState, BatchStreamState, BatchStreamBuffers](rank: Int, nranks: Int, gpu: Int, controlCommunication: DistributedCommunicationNonRoot, model: SupervisedModel[I, M], trainBatches: () => BatchStream[(I, STen), BatchStreamState, BatchStreamBuffers], validationBatches: () => BatchStream[(I, STen), BatchStreamState, BatchStreamBuffers], accumulateGradientOverNBatches: Int = 1): IO[Unit]
Follows a distributed training loop.
Follows a distributed training loop. See the documentation of driveDistributedTraining.
- def localDataParallelTrainingLoop[I, M <: GenericModule[I, Variable], LRState, BatchStreamState, BatchStreamBuffers](modelsWithDataStreams: Seq[(SupervisedModel[I, M], () => BatchStream[(I, STen), BatchStreamState, BatchStreamBuffers], () => BatchStream[(I, STen), BatchStreamState, BatchStreamBuffers])], optimizerFactory: (Seq[(STen, PTag)]) => Optimizer, maxEpochs: Int, checkpointState: Option[(LoopStateWithModelAndOptimizerData, LRState) => IO[Unit]] = None, validationFrequency: Int = 1, returnMinValidationLossModel: Seq[Int] = Nil, learningRateSchedule: LearningRateSchedule[LRState] = LearningRateSchedule.noop, initState: Option[LoopStateWithModelAndOptimizerData] = None, accumulateGradientOverNBatches: Int = 1, learningRateScheduleInitState: Option[LRState] = None, validationLossExponentialSmoothingFactor: Double = 1.0)(implicit arg0: Load[M]): IO[LoopState]
Data parallel training loop driving multiple devices from a single process
Data parallel training loop driving multiple devices from a single process
modelsWithDataStreams
is sequence of models, training and validation streams allocated to each devices. The streams must have the same length and must not contain empty batches. Models must have the same shape.Once the returned suspended side effect is completed the trained model is in the *first* model of
modelsWithDataStreams
. - def oneEpoch[I, M <: GenericModule[I, Variable], S, C](model: SupervisedModel[I, M], stepOptimizerFn: Option[(Seq[Option[STen]]) => Unit], batches: BatchStream[(I, STen), S, C], logger: Option[Logger], accumulateGradientOverNBatches: Int, ncclComm: NcclComm, rootRank: Int, device: CudaDevice, forwardOnly: Boolean): IO[Double]
Drives one epoch in the clique All batch streams in all members of the clique *MUST* have the same number of batches otherwise this will never terminate
- object DistributedCommunication
- object LocalCommunication
- object LoopState extends Serializable
- object LoopStateWithModelAndOptimizerData extends Serializable