fedscale.core.aggregation package¶
Submodules¶
fedscale.core.aggregation.aggregator module¶
- class fedscale.core.aggregation.aggregator.Aggregator(args)[source]¶
Bases:
JobServiceServicer
This centralized aggregator collects training/testing feedbacks from executors
- Parameters:
args (dictionary) – Variable arguments for fedscale runtime config. defaults to the setup in arg_parser.py
- CLIENT_EXECUTE_COMPLETION(request, context)[source]¶
FL clients complete the execution task.
- Parameters:
request (CompleteRequest) – Complete request info from executor.
- Returns:
Server response to job completion request
- Return type:
ServerResponse
- CLIENT_PING(request, context)[source]¶
Handle client ping requests
- Parameters:
request (PingRequest) – Ping request info from executor.
- Returns:
Server response to ping request
- Return type:
ServerResponse
- CLIENT_REGISTER(request, context)[source]¶
FL Client register to the aggregator
- Parameters:
request (RegisterRequest) – Registeration request info from executor.
- Returns:
Server response to registeration request
- Return type:
ServerResponse
- add_event_handler(client_id, event, meta, data)[source]¶
Due to the large volume of requests, we will put all events into a queue first.
- Parameters:
client_id (int) – The client id.
event (string) – grpc event MODEL_TEST or UPLOAD_MODEL.
meta (dictionary or string) – Meta message for grpc communication, could be event.
data (dictionary) – Data transferred in grpc communication, could be model parameters, test result.
- aggregate_client_group_weights(results)[source]¶
Streaming weight aggregation. Similar to aggregate_client_weights, but each key corresponds to a group of weights (e.g., for Tensorflow)
- Parameters:
results (dictionary) – Client’s training result
- aggregate_client_weights(results)[source]¶
May aggregate client updates on the fly
- Parameters:
results (dictionary) – client’s training result
[FedAvg] “Communication-Efficient Learning of Deep Networks from Decentralized Data”. H. Brendan McMahan, Eider Moore, Daniel Ramage, Seth Hampson, Blaise Aguera y Arcas. AISTATS, 2017
- broadcast_aggregator_events(event)[source]¶
Issue tasks (events) to aggregator worker processes by adding grpc request event (e.g. MODEL_TEST, MODEL_TRAIN) to event_queue.
- Parameters:
event (string) – grpc event (e.g. MODEL_TEST, MODEL_TRAIN) to event_queue.
- client_completion_handler(results)[source]¶
We may need to keep all updates from clients, if so, we need to append results to the cache
- Parameters:
results (dictionary) – client’s training result
- client_register_handler(executorId, info)[source]¶
Triggered once receive new executor registration.
- Parameters:
executorId (int) – Executor Id
info (dictionary) – Executor information
- create_client_task(executorId)[source]¶
Issue a new client training task to specific executor
- Parameters:
executorId (int) – Executor Id.
- Returns:
Training config for new task. (dictionary, PyTorch or TensorFlow module)
- Return type:
tuple
- deserialize_response(responses)[source]¶
Deserialize the response from executor
- Parameters:
responses (byte stream) – Serialized response from executor.
- Returns:
The deserialized response object from executor.
- Return type:
string, bool, or bytes
- dispatch_client_events(event, clients=None)[source]¶
Issue tasks (events) to clients
- Parameters:
event (string) – grpc event (e.g. MODEL_TEST, MODEL_TRAIN) to event_queue.
clients (list of int) – target client ids for event.
- executor_info_handler(executorId, info)[source]¶
Handler for register executor info and it will start the round after number of executor reaches requirement.
- Parameters:
executorId (int) – Executor Id
info (dictionary) – Executor information
- get_client_conf(clientId)[source]¶
Training configurations that will be applied on clients, developers can further define personalized client config here.
- Parameters:
clientId (int) – The client id.
- Returns:
Client training config.
- Return type:
dictionary
- get_global_model()[source]¶
Get global model that would be used by all FL clients (in default FL)
- Returns:
Based on the executor’s machine learning framework, initialize and return the model for training.
- Return type:
PyTorch or TensorFlow module
- get_shutdown_config(client_id)[source]¶
Shutdown config for client, developers can further define personalized client config here.
- Parameters:
client_id (int) – Client id.
- Returns:
Shutdown config for new task.
- Return type:
dictionary
- get_test_config(client_id)[source]¶
FL model testing on clients, developers can further define personalized client config here.
- Parameters:
client_id (int) – The client id.
- Returns:
The testing config for new task.
- Return type:
dictionary
- init_client_manager(args)[source]¶
Initialize client sampler
- Parameters:
args (dictionary) – Variable arguments for fedscale runtime config. defaults to the setup in arg_parser.py
- Returns:
The client manager class
- Return type:
Currently we implement two client managers:
1. Random client sampler - it selects participants randomly in each round [Ref]: https://arxiv.org/abs/1902.01046
2. Oort sampler Oort prioritizes the use of those clients who have both data that offers the greatest utility in improving model accuracy and the capability to run training quickly. [Ref]: https://www.usenix.org/conference/osdi21/presentation/lai
- init_control_communication()[source]¶
Create communication channel between coordinator and executor. This channel serves control messages.
- load_client_profile(file_path)[source]¶
For Simulation Mode: load client profiles/traces
- Parameters:
file_path (string) – File path for the client profiles/traces
- Returns:
Return the client profiles/traces
- Return type:
dictionary
- round_completion_handler()[source]¶
Triggered upon the round completion, it registers the last round execution info, broadcast new tasks for executors and select clients for next round.
- round_weight_handler(last_model)[source]¶
Update model when the round completes
- Parameters:
last_model (list) – A list of global model weight in last round.
- run()[source]¶
Start running the aggregator server by setting up execution and communication environment, and monitoring the grpc message.
- select_participants(select_num_participants, overcommitment=1.3)[source]¶
Select clients for next round.
- Parameters:
select_num_participants (int) – Number of clients to select.
overcommitment (float) – Overcommit ration for next round.
- Returns:
The list of sampled clients id.
- Return type:
list of int
- serialize_response(responses)[source]¶
Serialize the response to send to server upon assigned job completion
- Parameters:
responses (ServerResponse) – Serialized response from server.
- Returns:
The serialized response object to server.
- Return type:
bytes
- setup_seed(seed=1)[source]¶
Set global random seed for better reproducibility
- Parameters:
seed (int) – random seed
- testing_completion_handler(client_id, results)[source]¶
Each executor will handle a subset of testing dataset
- Parameters:
client_id (int) – The client id.
results (dictionary) – The client test results.
- tictak_client_tasks(sampled_clients, num_clients_to_collect)[source]¶
Record sampled client execution information in last round. In the SIMULATION_MODE, further filter the sampled_client and pick the top num_clients_to_collect clients.
- Parameters:
sampled_clients (list of int) – Sampled clients from client manager
num_clients_to_collect (int) – The number of clients actually needed for next round.
- Returns:
Return the sampled clients and client execution information in the last round.
- Return type:
tuple
fedscale.core.aggregation.optimizers module¶
- class fedscale.core.aggregation.optimizers.ServerOptimizer(mode, args, device, sample_seed=233)[source]¶
Bases:
object
This is a abstract server optimizer class
- Parameters:
mode (string) – mode of gradient aggregation policy
args (distionary) – Variable arguments for fedscale runtime config. defaults to the setup in arg_parser.py
device (string) – Runtime device type
sample_seed (int) – Random seed
- update_round_gradient(last_model, current_model, target_model)[source]¶
update global model based on different policy
- Parameters:
last_model (list of tensor weight) – A list of global model weight in last round.
current_model (list of tensor weight) – A list of global model weight in this round.
target_model (PyTorch or TensorFlow nn module) – Aggregated model.