Skip to content

Support all NeptuneML API command parameters in neptune_ml magics, accept unified JSON blob for parameter input #202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 1, 2021
2 changes: 2 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -3,6 +3,8 @@
Starting with v1.31.6, this file will contain a record of major features and updates made in each release of graph-notebook.

## Upcoming
- Added full support for NeptuneML API command parameters to `%neptune_ml` ([Link to PR](https://github.com/aws/graph-notebook/pull/202))
- Allow `%%neptune_ml` to accept JSON blob as parameter input for most phases ([Link to PR](https://github.com/aws/graph-notebook/pull/202))
- Added `--silent` option for suppressing query output ([PR #1](https://github.com/aws/graph-notebook/pull/201)) ([PR #2](https://github.com/aws/graph-notebook/pull/203))

## Release 3.0.6 (September 20, 2021)
446 changes: 407 additions & 39 deletions src/graph_notebook/magics/ml.py

Large diffs are not rendered by default.

28 changes: 20 additions & 8 deletions src/graph_notebook/neptune/client.py
Original file line number Diff line number Diff line change
@@ -392,14 +392,17 @@ def dataprocessing_stop(self, job_id: str, clean=False, neptune_iam_role_arn: st
return res

def modeltraining_start(self, data_processing_job_id: str, train_model_s3_location: str,
max_hpo_number_of_training_jobs: int, max_hpo_parallel_training_jobs: int,
**kwargs) -> requests.Response:
"""
for a full list of supported parameters, see:
https://docs.aws.amazon.com/neptune/latest/userguide/machine-learning-api-modeltraining.html
"""
data = {
'dataProcessingJobId': data_processing_job_id,
'trainModelS3Location': train_model_s3_location
'trainModelS3Location': train_model_s3_location,
'maxHPONumberOfTrainingJobs': max_hpo_number_of_training_jobs,
'maxHPOParallelTrainingJobs': max_hpo_parallel_training_jobs
}

for k, v in kwargs.items():
@@ -444,8 +447,9 @@ def modeltraining_stop(self, training_job_id: str, neptune_iam_role_arn: str = '
res = self._http_session.send(req)
return res

def modeltransform_create(self, output_s3_location: str, dataprocessing_job_id: str = '', modeltraining_job_id: str = '',
training_job_name: str = '', **kwargs) -> requests.Response:
def modeltransform_create(self, output_s3_location: str, dataprocessing_job_id: str = '',
modeltraining_job_id: str = '', training_job_name: str = '',
**kwargs) -> requests.Response:
logger.debug("modeltransform_create initiated with params:"
f"output_s3_location: {output_s3_location}\n"
f"dataprocessing_job_id: {dataprocessing_job_id}\n"
@@ -462,7 +466,8 @@ def modeltransform_create(self, output_s3_location: str, dataprocessing_job_id:
data['mlModelTrainingJobId'] = modeltraining_job_id
else:
raise ValueError(
'Invalid input. Must only specify either dataprocessing_job_id and modeltraining_job_id or only training_job_name')
'Invalid input. Must only specify either dataprocessing_job_id and modeltraining_job_id or only '
'training_job_name')

for k, v in kwargs.items():
data[k] = v
@@ -511,10 +516,17 @@ def modeltransform_stop(self, job_id: str, iam_role: str = '', clean: bool = Fal
res = self._http_session.send(req)
return res

def endpoints_create(self, training_job_id: str, **kwargs) -> requests.Response:
data = {
'mlModelTrainingJobId': training_job_id
}
def endpoints_create(self, model_training_job_id: str = '', model_transform_job_id: str = '',
**kwargs) -> requests.Response:
data = {}

if model_training_job_id and not model_transform_job_id:
data['mlModelTrainingJobId'] = model_training_job_id
elif model_transform_job_id and not model_training_job_id:
data['mlModelTransformJobId'] = model_transform_job_id
else:
raise ValueError('Invalid input. Must either specify model_training_job_id or model_transform_job_id, '
'and not both.')

for k, v in kwargs.items():
data[k] = v
Original file line number Diff line number Diff line change
@@ -443,7 +443,9 @@
"--job-id {training_job_name}\n",
"--data-processing-id {training_job_name} \n",
"--instance-type ml.p3.2xlarge\n",
"--s3-output-uri {str(s3_bucket_uri)}/training \"\"\""
"--s3-output-uri {str(s3_bucket_uri)}/training\n",
"--max-hpo-number 2\n",
"--max-hpo-parallel 2 \"\"\""
],
"outputs": [],
"metadata": {}
@@ -479,7 +481,7 @@
"source": [
"endpoint_params=f\"\"\"\n",
"--job-id {training_job_name} \n",
"--model-job-id {training_job_name} \"\"\""
"--model--training-job-id {training_job_name} \"\"\""
],
"outputs": [],
"metadata": {}
@@ -751,7 +753,7 @@
"cell_type": "code",
"execution_count": null,
"source": [
"neptune_ml.delete_endpoint(endpoint)"
"neptune_ml.delete_endpoint(training_job_name)"
],
"outputs": [],
"metadata": {}
Original file line number Diff line number Diff line change
@@ -440,7 +440,9 @@
"--job-id {training_job_name} \n",
"--data-processing-id {training_job_name} \n",
"--instance-type ml.p3.2xlarge\n",
"--s3-output-uri {str(s3_bucket_uri)}/training \"\"\""
"--s3-output-uri {str(s3_bucket_uri)}/training\n",
"--max-hpo-number 2\n",
"--max-hpo-parallel 2 \"\"\""
]
},
{
@@ -475,7 +477,7 @@
"source": [
"endpoint_params=f\"\"\"\n",
"--job-id {training_job_name} \n",
"--model-job-id {training_job_name}\"\"\""
"--model-training-job-id {training_job_name}\"\"\""
]
},
{
Original file line number Diff line number Diff line change
@@ -414,7 +414,9 @@
"--job-id {training_job_name} \n",
"--data-processing-id {training_job_name} \n",
"--instance-type ml.p3.2xlarge\n",
"--s3-output-uri {str(s3_bucket_uri)}/training \"\"\""
"--s3-output-uri {str(s3_bucket_uri)}/training\n",
"--max-hpo-number 2\n",
"--max-hpo-parallel 2 \"\"\""
]
},
{
@@ -449,7 +451,7 @@
"source": [
"endpoint_params=f\"\"\"\n",
"--job-id {training_job_name} \n",
"--model-job-id {training_job_name}\"\"\""
"--model-training-job-id {training_job_name}\"\"\""
]
},
{
Original file line number Diff line number Diff line change
@@ -442,7 +442,9 @@
"--job-id {training_job_name} \n",
"--data-processing-id {training_job_name} \n",
"--instance-type ml.p3.2xlarge\n",
"--s3-output-uri {str(s3_bucket_uri)}/training \"\"\""
"--s3-output-uri {str(s3_bucket_uri)}/training\n",
"--max-hpo-number 2\n",
"--max-hpo-parallel 2 \"\"\""
]
},
{
@@ -478,7 +480,7 @@
"source": [
"endpoint_params=f\"\"\"\n",
"--job-id {training_job_name} \n",
"--model-job-id {training_job_name}\"\"\""
"--model-training-job-id {training_job_name}\"\"\""
]
},
{
@@ -636,7 +638,7 @@
"metadata": {},
"outputs": [],
"source": [
"neptune_ml.delete_endpoint(endpoint)"
"neptune_ml.delete_endpoint(training_job_name)"
]
},
{
Original file line number Diff line number Diff line change
@@ -441,7 +441,9 @@
"--job-id {training_job_name} \n",
"--data-processing-id {training_job_name} \n",
"--instance-type ml.p3.2xlarge\n",
"--s3-output-uri {str(s3_bucket_uri)}/training \"\"\""
"--s3-output-uri {str(s3_bucket_uri)}/training\n",
"--max-hpo-number 2\n",
"--max-hpo-parallel 2 \"\"\""
]
},
{
@@ -476,7 +478,7 @@
"source": [
"endpoint_params=f\"\"\"\n",
"--job-id {training_job_name} \n",
"--model-job-id {training_job_name}\"\"\""
"--model-training-job-id {training_job_name}\"\"\""
]
},
{
29 changes: 19 additions & 10 deletions test/integration/iam/ml/test_neptune_ml_with_iam.py
Original file line number Diff line number Diff line change
@@ -24,6 +24,8 @@
NEPTUNE_ML_IAM_ROLE_ARN = os.getenv('NEPTUNE_ML_IAM_ROLE_ARN')
NEPTUNE_ML_COMPLETED_TRAINING_ID = os.getenv('NEPTUNE_ML_COMPLETED_TRAINING_ID')
NEPTUNE_ML_TRANSFORM_OUTPUT = 's3://akline-misc/transform'
NEPTUNE_ML_MAX_TOTAL_HPO_TRAINING_JOBS = 2
NEPTUNE_ML_MAX_PARALLEL_HPO_TRAINING_JOBS = 2


class TestNeptuneMLWithIAM(GraphNotebookIntegrationTest):
@@ -89,7 +91,9 @@ def test_neptune_ml_dataprocessing(self):

def test_neptune_ml_modeltraining(self):
training_res = self.client.modeltraining_start(NEPTUNE_ML_COMPLETED_DATAPROCESSING_JOB_ID,
NEPTUNE_ML_TRAINING_OUTPUT)
NEPTUNE_ML_TRAINING_OUTPUT,
NEPTUNE_ML_MAX_TOTAL_HPO_TRAINING_JOBS,
NEPTUNE_ML_MAX_PARALLEL_HPO_TRAINING_JOBS)
assert training_res.status_code == 200
training = training_res.json()

@@ -109,9 +113,9 @@ def test_neptune_ml_modeltraining(self):
assert delete_res.status_code == 200

def test_neptune_ml_modeltransform(self):
create_res = self.client.modeltransform_create(NEPTUNE_ML_TRANSFORM_OUTPUT,
NEPTUNE_ML_COMPLETED_DATAPROCESSING_JOB_ID,
NEPTUNE_ML_COMPLETED_TRAINING_ID)
create_res = self.client.modeltransform_create(output_s3_location=NEPTUNE_ML_TRANSFORM_OUTPUT,
dataprocessing_job_id=NEPTUNE_ML_COMPLETED_DATAPROCESSING_JOB_ID,
modeltraining_job_id=NEPTUNE_ML_COMPLETED_TRAINING_ID)
assert create_res.status_code == 200

create = create_res.json()
@@ -138,6 +142,8 @@ def test_neptune_ml_e2e(self):
s3_input_uri = os.getenv('NEPTUNE_ML_DATAPROCESSING_S3_INPUT', '')
s3_processed_uri = os.getenv('NEPTUNE_ML_DATAPROCESSING_S3_PROCESSED', '')
train_model_s3_location = os.getenv('NEPTUNE_ML_TRAINING_S3_LOCATION', '')
hpo_number = NEPTUNE_ML_MAX_TOTAL_HPO_TRAINING_JOBS
hpo_parallel = NEPTUNE_ML_MAX_PARALLEL_HPO_TRAINING_JOBS

assert s3_input_uri != ''
assert s3_processed_uri != ''
@@ -152,7 +158,7 @@ def test_neptune_ml_e2e(self):
p.join(3600)

logger.info("model training...")
training_job = do_modeltraining(dataprocessing_id, train_model_s3_location)
training_job = do_modeltraining(dataprocessing_id, train_model_s3_location, hpo_number, hpo_parallel)
training_job_id = training_job['id']

p = threading.Thread(target=wait_for_modeltraining_complete, args=(training_job_id,))
@@ -186,14 +192,16 @@ def test_neptune_ml_modeltraining_status(self):
def test_neptune_ml_training(self):
dataprocessing_id = os.getenv('NEPTUNE_ML_DATAPROCESSING_ID', '')
train_model_s3_location = os.getenv('NEPTUNE_ML_TRAINING_S3_LOCATION', '')
hpo_number = NEPTUNE_ML_MAX_TOTAL_HPO_TRAINING_JOBS
hpo_parallel = NEPTUNE_ML_MAX_PARALLEL_HPO_TRAINING_JOBS

assert dataprocessing_id != ''
assert train_model_s3_location != ''

dataprocessing_status = client.dataprocessing_job_status(dataprocessing_id)
assert dataprocessing_status.status_code == 200

job_start_res = client.modeltraining_start(dataprocessing_id, train_model_s3_location)
job_start_res = client.modeltraining_start(dataprocessing_id, train_model_s3_location, hpo_number, hpo_parallel)
assert job_start_res.status_code == 200

job_id = job_start_res.json()['id']
@@ -238,10 +246,11 @@ def wait_for_dataprocessing_complete(dataprocessing_id: str):
time.sleep(10)


def do_modeltraining(dataprocessing_id, train_model_s3_location):
def do_modeltraining(dataprocessing_id, train_model_s3_location, hpo_number, hpo_parallel):
logger.info(
f"starting training job from dataprocessing_job_id={dataprocessing_id} and training_model_s3_location={train_model_s3_location}")
training_start = client.modeltraining_start(dataprocessing_id, train_model_s3_location)
f"starting training job from dataprocessing_job_id={dataprocessing_id} "
f"and training_model_s3_location={train_model_s3_location}")
training_start = client.modeltraining_start(dataprocessing_id, train_model_s3_location, hpo_number, hpo_parallel)
assert training_start.status_code == 200
return training_start.json()

@@ -261,7 +270,7 @@ def wait_for_modeltraining_complete(training_job: str) -> dict:


def do_create_endpoint(training_job_id: str) -> dict:
endpoint_res = client.endpoints_create(training_job_id)
endpoint_res = client.endpoints_create(model_training_job_id=training_job_id)
assert endpoint_res.status_code == 200
return endpoint_res.json()