fedscale.cloud.aggregation package¶
Submodules¶
fedscale.cloud.aggregation.aggregator module¶
- class fedscale.cloud.aggregation.aggregator.Aggregator(args)[source]¶
Bases:
JobServiceServicer
This centralized aggregator collects training/testing feedbacks from executors
- Parameters:
args (dictionary) – Variable arguments for fedscale runtime config. defaults to the setup in arg_parser.py
- CLIENT_EXECUTE_COMPLETION(request, context)[source]¶
FL clients complete the execution task.
- Parameters:
request (CompleteRequest) – Complete request info from executor.
- Returns:
Server response to job completion request
- Return type:
ServerResponse
- CLIENT_PING(request, context)[source]¶
Handle client ping requests
- Parameters:
request (PingRequest) – Ping request info from executor.
- Returns:
Server response to ping request
- Return type:
ServerResponse
- CLIENT_REGISTER(request, context)[source]¶
FL TorchClient register to the aggregator
- Parameters:
request (RegisterRequest) – Registeration request info from executor.
- Returns:
Server response to registeration request
- Return type:
ServerResponse
- add_event_handler(client_id, event, meta, data)[source]¶
Due to the large volume of requests, we will put all events into a queue first.
- Parameters:
client_id (int) – The client id.
event (string) – grpc event MODEL_TEST or UPLOAD_MODEL.
meta (dictionary or string) – Meta message for grpc communication, could be event.
data (dictionary) – Data transferred in grpc communication, could be model parameters, test result.
- broadcast_aggregator_events(event)[source]¶
Issue tasks (events) to aggregator worker processes by adding grpc request event (e.g. MODEL_TEST, MODEL_TRAIN) to event_queue.
- Parameters:
event (string) – grpc event (e.g. MODEL_TEST, MODEL_TRAIN) to event_queue.
- client_completion_handler(results)[source]¶
We may need to keep all updates from clients, if so, we need to append results to the cache
- Parameters:
results (dictionary) – client’s training result
- client_register_handler(executorId, info)[source]¶
Triggered once receive new executor registration.
- Parameters:
executorId (int) – Executor Id
info (dictionary) – Executor information
- create_client_task(executorId)[source]¶
Issue a new client training task to specific executor
- Parameters:
executorId (int) – Executor Id.
- Returns:
Training config for new task. (dictionary, PyTorch or TensorFlow module)
- Return type:
tuple
- deserialize_response(responses)[source]¶
Deserialize the response from executor
- Parameters:
responses (byte stream) – Serialized response from executor.
- Returns:
The deserialized response object from executor.
- Return type:
string, bool, or bytes
- dispatch_client_events(event, clients=None)[source]¶
Issue tasks (events) to clients
- Parameters:
event (string) – grpc event (e.g. MODEL_TEST, MODEL_TRAIN) to event_queue.
clients (list of int) – target client ids for event.
- executor_info_handler(executorId, info)[source]¶
Handler for register executor info and it will start the round after number of executor reaches requirement.
- Parameters:
executorId (int) – Executor Id
info (dictionary) – Executor information
- get_client_conf(client_id)[source]¶
Training configurations that will be applied on clients, developers can further define personalized client config here.
- Parameters:
client_id (int) – The client id.
- Returns:
TorchClient training config.
- Return type:
dictionary
- get_shutdown_config(client_id)[source]¶
Shutdown config for client, developers can further define personalized client config here.
- Parameters:
client_id (int) – TorchClient id.
- Returns:
Shutdown config for new task.
- Return type:
dictionary
- get_test_config(client_id)[source]¶
FL model testing on clients, developers can further define personalized client config here.
- Parameters:
client_id (int) – The client id.
- Returns:
The testing config for new task.
- Return type:
dictionary
- init_client_manager(args)[source]¶
Initialize client sampler
- Parameters:
args (dictionary) – Variable arguments for fedscale runtime config. defaults to the setup in arg_parser.py
- Returns:
The client manager class
- Return type:
Currently we implement two client managers:
1. Random client sampler - it selects participants randomly in each round [Ref]: https://arxiv.org/abs/1902.01046
2. Oort sampler Oort prioritizes the use of those clients who have both data that offers the greatest utility in improving model accuracy and the capability to run training quickly. [Ref]: https://www.usenix.org/conference/osdi21/presentation/lai
- init_control_communication()[source]¶
Create communication channel between coordinator and executor. This channel serves control messages.
- load_client_profile(file_path)[source]¶
For Simulation Mode: load client profiles/traces
- Parameters:
file_path (string) – File path for the client profiles/traces
- Returns:
Return the client profiles/traces
- Return type:
dictionary
- round_completion_handler()[source]¶
Triggered upon the round completion, it registers the last round execution info, broadcast new tasks for executors and select clients for next round.
- run()[source]¶
Start running the aggregator server by setting up execution and communication environment, and monitoring the grpc message.
- select_participants(select_num_participants, overcommitment=1.3)[source]¶
Select clients for next round.
- Parameters:
select_num_participants (int) – Number of clients to select.
overcommitment (float) – Overcommit ration for next round.
- Returns:
The list of sampled clients id.
- Return type:
list of int
- serialize_response(responses)[source]¶
Serialize the response to send to server upon assigned job completion
- Parameters:
responses (ServerResponse) – Serialized response from server.
- Returns:
The serialized response object to server.
- Return type:
bytes
- setup_seed(seed=1)[source]¶
Set global random seed for better reproducibility
- Parameters:
seed (int) – random seed
- testing_completion_handler(client_id, results)[source]¶
Each executor will handle a subset of testing dataset
- Parameters:
client_id (int) – The client id.
results (dictionary) – The client test results.
- tictak_client_tasks(sampled_clients, num_clients_to_collect)[source]¶
Record sampled client execution information in last round. In the SIMULATION_MODE, further filter the sampled_client and pick the top num_clients_to_collect clients.
- Parameters:
sampled_clients (list of int) – Sampled clients from client manager
num_clients_to_collect (int) – The number of clients actually needed for next round.
- Returns:
Return the sampled clients and client execution information in the last round.
- Return type:
tuple
fedscale.cloud.aggregation.android_aggregator module¶
- class fedscale.cloud.aggregation.android_aggregator.Android_Aggregator(args)[source]¶
Bases:
Aggregator
This aggregator collects training/testing feedbacks from Android MNN APPs.
- Parameters:
args (dictionary) – Variable arguments for fedscale runtime config. Defaults to the setup in arg_parser.py.
- deserialize_response(responses)[source]¶
Deserialize the response from executor. If the response contains mnn json model, convert to pytorch state_dict.
- Parameters:
responses (byte stream) – Serialized response from executor.
- Returns:
The deserialized response object from executor.
- Return type:
string, bool, or bytes
- init_model()[source]¶
Load the model architecture and convert to mnn. NOTE: MNN does not support dropout.
- round_weight_handler(last_model)[source]¶
Update model when the round completes. Then convert new model to mnn json.
- Parameters:
last_model (list) – A list of global model weight in last round.
- serialize_response(responses)[source]¶
Serialize the response to send to server upon assigned job completion. If the responses is the pytorch model, change it to mnn_json.
- Parameters:
responses (ServerResponse) – Serialized response from server.
- Returns:
The serialized response object to server.
- Return type:
bytes
fedscale.cloud.aggregation.optimizers module¶
- class fedscale.cloud.aggregation.optimizers.TorchServerOptimizer(mode, args, device, sample_seed=233)[source]¶
Bases:
object
This is a abstract server optimizer class
- Parameters:
mode (string) – mode of gradient aggregation policy
args (distionary) – Variable arguments for fedscale runtime config. defaults to the setup in arg_parser.py
device (string) – Runtime device type
sample_seed (int) – Random seed
- update_round_gradient(last_model, current_model, target_model)[source]¶
update global model based on different policy
- Parameters:
last_model (list of tensor weight) – A list of global model weight in last round.
current_model (list of tensor weight) – A list of global model weight in this round.
target_model (PyTorch or TensorFlow nn module) – Aggregated model.