From dad6face2bd8dfcdff4f41b1edb0fed063655d17 Mon Sep 17 00:00:00 2001 From: clint shryock Date: Thu, 22 Sep 2016 14:59:13 -0500 Subject: [PATCH] re-go-fmt after rebase use us-west-2 region in tests update test with working config provider/aws: Update EMR contribution with passing test, polling for instance in DELETE method remove defaulted role document emr_cluster rename aws_emr -> aws_emr_cluster update docs for name change update delete timeout/polling rename emr taskgroup to emr instance group default instance group count to 0, down from 60 update to ref emr_cluster, emr_instance_group more cleanups for instance groups; need to read and update add read, delete method for instance groups refactor the read method to seperate out the fetching of the specific group more refactoring for finding instance groups update emr instance group docs err check on reading HTTP. Dont' return the error, just log it refactor the create method to catch optionals additional cleanups, added a read method update test to be non-master-only wrap up the READ method for clusters poll for instance group to be running after a modification patch up a possible deref provider/aws: EMR cleanups fix test naming remove outdated docs randomize emr_profile names --- builtin/providers/aws/provider.go | 14 +- builtin/providers/aws/resource_aws_emr.go | 498 ------------- .../providers/aws/resource_aws_emr_cluster.go | 668 ++++++++++++++++++ .../aws/resource_aws_emr_cluster_test.go | 373 ++++++++++ .../aws/resource_aws_emr_instance_group.go | 251 +++++++ .../resource_aws_emr_instance_group_test.go | 356 ++++++++++ .../aws/resource_aws_emr_task_group.go | 115 --- .../aws/resource_aws_emr_task_group_test.go | 176 ----- .../providers/aws/resource_aws_emr_test.go | 184 ----- .../docs/providers/aws/r/emr_cluster.html.md | 397 +++++++++++ .../aws/r/emr_instance_group.html.md | 50 ++ website/source/layouts/aws.erb | 13 + 12 files changed, 2115 insertions(+), 980 deletions(-) delete mode 100644 builtin/providers/aws/resource_aws_emr.go create mode 100644 builtin/providers/aws/resource_aws_emr_cluster.go create mode 100644 builtin/providers/aws/resource_aws_emr_cluster_test.go create mode 100644 builtin/providers/aws/resource_aws_emr_instance_group.go create mode 100644 builtin/providers/aws/resource_aws_emr_instance_group_test.go delete mode 100644 builtin/providers/aws/resource_aws_emr_task_group.go delete mode 100644 builtin/providers/aws/resource_aws_emr_task_group_test.go delete mode 100644 builtin/providers/aws/resource_aws_emr_test.go create mode 100644 website/source/docs/providers/aws/r/emr_cluster.html.md create mode 100644 website/source/docs/providers/aws/r/emr_instance_group.html.md diff --git a/builtin/providers/aws/provider.go b/builtin/providers/aws/provider.go index 70e741e0ea..c3e48e9023 100644 --- a/builtin/providers/aws/provider.go +++ b/builtin/providers/aws/provider.go @@ -232,13 +232,13 @@ func Provider() terraform.ResourceProvider { "aws_elasticsearch_domain": resourceAwsElasticSearchDomain(), "aws_elastictranscoder_pipeline": resourceAwsElasticTranscoderPipeline(), "aws_elastictranscoder_preset": resourceAwsElasticTranscoderPreset(), - "aws_elb": resourceAwsElb(), - "aws_elb_attachment": resourceAwsElbAttachment(), - "aws_emr": resourceAwsEMR(), - "aws_emr_task_group": resourceAwsEMRTaskGroup(), - "aws_flow_log": resourceAwsFlowLog(), - "aws_glacier_vault": resourceAwsGlacierVault(), - "aws_iam_access_key": resourceAwsIamAccessKey(), + "aws_elb": resourceAwsElb(), + "aws_elb_attachment": resourceAwsElbAttachment(), + "aws_emr_cluster": resourceAwsEMRCluster(), + "aws_emr_instance_group": resourceAwsEMRInstanceGroup(), + "aws_flow_log": resourceAwsFlowLog(), + "aws_glacier_vault": resourceAwsGlacierVault(), + "aws_iam_access_key": resourceAwsIamAccessKey(), "aws_iam_account_password_policy": resourceAwsIamAccountPasswordPolicy(), "aws_iam_group_policy": resourceAwsIamGroupPolicy(), "aws_iam_group": resourceAwsIamGroup(), diff --git a/builtin/providers/aws/resource_aws_emr.go b/builtin/providers/aws/resource_aws_emr.go deleted file mode 100644 index 76201cd03d..0000000000 --- a/builtin/providers/aws/resource_aws_emr.go +++ /dev/null @@ -1,498 +0,0 @@ -package aws - -import ( - "log" - - "encoding/json" - "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/service/emr" - "github.com/hashicorp/terraform/helper/resource" - "github.com/hashicorp/terraform/helper/schema" - "io/ioutil" - "net/http" - "strings" - "time" -) - -func resourceAwsEMR() *schema.Resource { - return &schema.Resource{ - Create: resourceAwsEMRCreate, - Read: resourceAwsEMRRead, - Update: resourceAwsEMRUpdate, - Delete: resourceAwsEMRDelete, - Schema: map[string]*schema.Schema{ - "name": &schema.Schema{ - Type: schema.TypeString, - Required: true, - }, - "release_label": &schema.Schema{ - Type: schema.TypeString, - Required: true, - }, - "master_instance_type": &schema.Schema{ - Type: schema.TypeString, - Required: true, - ForceNew: true, - }, - "core_instance_type": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - Computed: true, - }, - "core_instance_count": &schema.Schema{ - Type: schema.TypeInt, - Optional: true, - Default: 0, - }, - "log_uri": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "applications": &schema.Schema{ - Type: schema.TypeSet, - Optional: true, - ForceNew: true, - Elem: &schema.Schema{Type: schema.TypeString}, - Set: schema.HashString, - }, - "ec2_attributes": &schema.Schema{ - Type: schema.TypeList, - MaxItems: 1, - Optional: true, - Elem: &schema.Resource{ - Schema: map[string]*schema.Schema{ - "key_name": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "subnet_id": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "additional_master_security_groups": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "additional_slave_security_groups": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "emr_managed_master_security_group": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "emr_managed_slave_security_group": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "instance_profile": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - }, - }, - }, - "bootstrap_action": &schema.Schema{ - Type: schema.TypeSet, - Optional: true, - Elem: &schema.Resource{ - Schema: map[string]*schema.Schema{ - "name": &schema.Schema{ - Type: schema.TypeString, - Required: true, - }, - "path": &schema.Schema{ - Type: schema.TypeString, - Required: true, - }, - "args": &schema.Schema{ - Type: schema.TypeSet, - Optional: true, - Elem: &schema.Schema{Type: schema.TypeString}, - Set: schema.HashString, - }, - }, - }, - }, - "tags": tagsSchema(), - "configurations": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "service_role": &schema.Schema{ - Type: schema.TypeString, - Optional: true, - }, - "visible_to_all_users": &schema.Schema{ - Type: schema.TypeBool, - Optional: true, - Default: true, - }, - }, - } -} - -func resourceAwsEMRCreate(d *schema.ResourceData, meta interface{}) error { - conn := meta.(*AWSClient).emrconn - - log.Printf("[DEBUG] Creating EMR cluster") - masterInstanceType := d.Get("master_instance_type").(string) - coreInstanceType := masterInstanceType - if v, ok := d.GetOk("core_instance_type"); ok { - coreInstanceType = v.(string) - } - coreInstanceCount := d.Get("core_instance_count").(int) - - applications := d.Get("applications").(*schema.Set).List() - var userKey, subnet, extraMasterSecGrp, extraSlaveSecGrp, emrMasterSecGrp, emrSlaveSecGrp, instanceProfile, serviceRole string - instanceProfile = "EMR_EC2_DefaultRole" - - if a, ok := d.GetOk("ec2_attributes"); ok { - ec2Attributes := a.([]interface{}) - attributes := ec2Attributes[0].(map[string]interface{}) - userKey = attributes["key_name"].(string) - subnet = attributes["subnet_id"].(string) - extraMasterSecGrp = attributes["additional_master_security_groups"].(string) - extraSlaveSecGrp = attributes["additional_slave_security_groups"].(string) - emrMasterSecGrp = attributes["emr_managed_master_security_group"].(string) - emrSlaveSecGrp = attributes["emr_managed_slave_security_group"].(string) - - if len(strings.TrimSpace(attributes["instance_profile"].(string))) != 0 { - instanceProfile = strings.TrimSpace(attributes["instance_profile"].(string)) - } - } - - if v, ok := d.GetOk("service_role"); ok { - serviceRole = v.(string) - } else { - serviceRole = "EMR_DefaultRole" - } - - emrApps := expandApplications(applications) - - params := &emr.RunJobFlowInput{ - Instances: &emr.JobFlowInstancesConfig{ - Ec2KeyName: aws.String(userKey), - Ec2SubnetId: aws.String(subnet), - InstanceCount: aws.Int64(int64(coreInstanceCount + 1)), - KeepJobFlowAliveWhenNoSteps: aws.Bool(true), - MasterInstanceType: aws.String(masterInstanceType), - SlaveInstanceType: aws.String(coreInstanceType), - TerminationProtected: aws.Bool(false), - AdditionalMasterSecurityGroups: []*string{ - aws.String(extraMasterSecGrp), - }, - AdditionalSlaveSecurityGroups: []*string{ - aws.String(extraSlaveSecGrp), - }, - EmrManagedMasterSecurityGroup: aws.String(emrMasterSecGrp), - EmrManagedSlaveSecurityGroup: aws.String(emrSlaveSecGrp), - }, - Name: aws.String(d.Get("name").(string)), - Applications: emrApps, - - JobFlowRole: aws.String(instanceProfile), - ReleaseLabel: aws.String(d.Get("release_label").(string)), - ServiceRole: aws.String(serviceRole), - VisibleToAllUsers: aws.Bool(d.Get("visible_to_all_users").(bool)), - } - - if v, ok := d.GetOk("log_uri"); ok { - logUrl := v.(string) - params.LogUri = aws.String(logUrl) - } - if v, ok := d.GetOk("bootstrap_action"); ok { - bootstrapActions := v.(*schema.Set).List() - log.Printf("[DEBUG] %v\n", bootstrapActions) - params.BootstrapActions = expandBootstrapActions(bootstrapActions) - } - if v, ok := d.GetOk("tags"); ok { - tagsIn := v.(map[string]interface{}) - params.Tags = expandTags(tagsIn) - } - if v, ok := d.GetOk("configurations"); ok { - confUrl := v.(string) - params.Configurations = expandConfigures(confUrl) - } - - log.Printf("[DEBUG] EMR Cluster create options: %s", params) - resp, err := conn.RunJobFlow(params) - - if err != nil { - log.Printf("[ERROR] %s", err) - return err - } - - log.Printf("[DEBUG] Created EMR Cluster done...") - d.SetId(*resp.JobFlowId) - - log.Println( - "[INFO] Waiting for EMR Cluster to be available") - - stateConf := &resource.StateChangeConf{ - Pending: []string{"STARTING", "BOOTSTRAPPING"}, - Target: []string{"WAITING", "RUNNING"}, - Refresh: resourceAwsEMRClusterStateRefreshFunc(d, meta), - Timeout: 40 * time.Minute, - MinTimeout: 10 * time.Second, - Delay: 30 * time.Second, // Wait 30 secs before starting - } - - _, err = stateConf.WaitForState() - if err != nil { - return fmt.Errorf("[WARN] Error waiting for EMR Cluster state to be \"WAITING\": %s", err) - } - - return resourceAwsEMRRead(d, meta) -} - -func resourceAwsEMRRead(d *schema.ResourceData, meta interface{}) error { - emrconn := meta.(*AWSClient).emrconn - - req := &emr.DescribeClusterInput{ - ClusterId: aws.String(d.Id()), - } - - resp, err := emrconn.DescribeCluster(req) - if err != nil { - return fmt.Errorf("Error reading EMR cluster: %s", err) - } - - if resp.Cluster == nil { - d.SetId("") - log.Printf("[DEBUG] EMR Cluster (%s) not found", d.Id()) - return nil - } - - instance := resp.Cluster - - if instance.Status != nil { - if *resp.Cluster.Status.State == "TERMINATED" { - log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED already", d.Id()) - d.SetId("") - return nil - } - - if *resp.Cluster.Status.State == "TERMINATED_WITH_ERRORS" { - log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED_WITH_ERRORS already", d.Id()) - d.SetId("") - return nil - } - } - - instanceGroups, errGrps := loadGroups(d, meta) - if errGrps == nil { - coreGroup := findGroup(instanceGroups, "CORE") - if coreGroup != nil { - d.Set("core_instance_type", coreGroup.InstanceType) - } - } - - return nil -} - -func resourceAwsEMRUpdate(d *schema.ResourceData, meta interface{}) error { - conn := meta.(*AWSClient).emrconn - - if d.HasChange("core_instance_count") { - log.Printf("[DEBUG] Modify EMR cluster") - req := &emr.ListInstanceGroupsInput{ - ClusterId: aws.String(d.Id()), - } - - respGrps, errGrps := conn.ListInstanceGroups(req) - if errGrps != nil { - return fmt.Errorf("Error reading EMR cluster: %s", errGrps) - } - instanceGroups := respGrps.InstanceGroups - - coreInstanceCount := d.Get("core_instance_count").(int) - coreGroup := findGroup(instanceGroups, "CORE") - - params := &emr.ModifyInstanceGroupsInput{ - InstanceGroups: []*emr.InstanceGroupModifyConfig{ - { - InstanceGroupId: aws.String(*coreGroup.Id), - InstanceCount: aws.Int64(int64(coreInstanceCount)), - }, - }, - } - _, errModify := conn.ModifyInstanceGroups(params) - if errModify != nil { - log.Printf("[ERROR] %s", errModify) - return errModify - } - - log.Printf("[DEBUG] Modify EMR Cluster done...") - } - - return resourceAwsEMRRead(d, meta) -} - -func resourceAwsEMRDelete(d *schema.ResourceData, meta interface{}) error { - conn := meta.(*AWSClient).emrconn - - req := &emr.TerminateJobFlowsInput{ - JobFlowIds: []*string{ - aws.String(d.Id()), - }, - } - - _, err := conn.TerminateJobFlows(req) - if err != nil { - log.Printf("[ERROR], %s", err) - return err - } - - d.SetId("") - return nil -} - -func expandApplications(apps []interface{}) []*emr.Application { - appOut := make([]*emr.Application, 0, len(apps)) - - for _, appName := range expandStringList(apps) { - app := &emr.Application{ - Name: appName, - } - appOut = append(appOut, app) - } - return appOut -} - -func loadGroups(d *schema.ResourceData, meta interface{}) ([]*emr.InstanceGroup, error) { - emrconn := meta.(*AWSClient).emrconn - reqGrps := &emr.ListInstanceGroupsInput{ - ClusterId: aws.String(d.Id()), - } - - respGrps, errGrps := emrconn.ListInstanceGroups(reqGrps) - if errGrps != nil { - return nil, fmt.Errorf("Error reading EMR cluster: %s", errGrps) - } - return respGrps.InstanceGroups, nil -} - -func findGroup(grps []*emr.InstanceGroup, typ string) *emr.InstanceGroup { - for _, grp := range grps { - if *grp.InstanceGroupType == typ { - return grp - } - } - return nil -} - -func expandTags(m map[string]interface{}) []*emr.Tag { - var result []*emr.Tag - for k, v := range m { - result = append(result, &emr.Tag{ - Key: aws.String(k), - Value: aws.String(v.(string)), - }) - } - - return result -} - -func expandBootstrapActions(bootstrapActions []interface{}) []*emr.BootstrapActionConfig { - actionsOut := []*emr.BootstrapActionConfig{} - - for _, raw := range bootstrapActions { - actionAttributes := raw.(map[string]interface{}) - actionName := actionAttributes["name"].(string) - actionPath := actionAttributes["path"].(string) - actionArgs := actionAttributes["args"].(*schema.Set).List() - - action := &emr.BootstrapActionConfig{ - Name: aws.String(actionName), - ScriptBootstrapAction: &emr.ScriptBootstrapActionConfig{ - Path: aws.String(actionPath), - Args: expandStringList(actionArgs), - }, - } - actionsOut = append(actionsOut, action) - } - - return actionsOut -} - -func expandConfigures(input string) []*emr.Configuration { - configsOut := []*emr.Configuration{} - if strings.HasPrefix(input, "http") { - readHttpJson(input, &configsOut) - } else if strings.HasSuffix(input, ".json") { - readLocalJson(input, &configsOut) - } else { - readBodyJson(input, &configsOut) - } - log.Printf("[DEBUG] Configures %v\n", configsOut) - - return configsOut -} - -func readHttpJson(url string, target interface{}) error { - r, err := http.Get(url) - if err != nil { - return err - } - defer r.Body.Close() - - return json.NewDecoder(r.Body).Decode(target) -} - -func readLocalJson(localFile string, target interface{}) error { - file, e := ioutil.ReadFile(localFile) - if e != nil { - log.Printf("[ERROR] %s", e) - return e - } - - return json.Unmarshal(file, target) -} - -func readBodyJson(body string, target interface{}) error { - log.Printf("[DEBUG] Raw Body %s\n", body) - err := json.Unmarshal([]byte(body), target) - if err != nil { - log.Printf("[ERROR] parsing JSON %s", err) - return err - } - return nil -} - -func resourceAwsEMRClusterStateRefreshFunc(d *schema.ResourceData, meta interface{}) resource.StateRefreshFunc { - return func() (interface{}, string, error) { - conn := meta.(*AWSClient).emrconn - - log.Printf("[INFO] Reading EMR Cluster Information: %s", d.Id()) - params := &emr.DescribeClusterInput{ - ClusterId: aws.String(d.Id()), - } - - resp, err := conn.DescribeCluster(params) - - if err != nil { - if awsErr, ok := err.(awserr.Error); ok { - if "ClusterNotFound" == awsErr.Code() { - return 42, "destroyed", nil - } - } - log.Printf("[WARN] Error on retrieving EMR Cluster (%s) when waiting: %s", d.Id(), err) - return nil, "", err - } - - emrc := resp.Cluster - - if emrc == nil { - return 42, "destroyed", nil - } - - if resp.Cluster.Status != nil { - log.Printf("[DEBUG] EMR Cluster status (%s): %s", d.Id(), *resp.Cluster.Status) - } - - return emrc, *emrc.Status.State, nil - } -} diff --git a/builtin/providers/aws/resource_aws_emr_cluster.go b/builtin/providers/aws/resource_aws_emr_cluster.go new file mode 100644 index 0000000000..82f744c491 --- /dev/null +++ b/builtin/providers/aws/resource_aws_emr_cluster.go @@ -0,0 +1,668 @@ +package aws + +import ( + "log" + + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/emr" + "github.com/hashicorp/terraform/helper/resource" + "github.com/hashicorp/terraform/helper/schema" +) + +func resourceAwsEMRCluster() *schema.Resource { + return &schema.Resource{ + Create: resourceAwsEMRClusterCreate, + Read: resourceAwsEMRClusterRead, + Update: resourceAwsEMRClusterUpdate, + Delete: resourceAwsEMRClusterDelete, + Schema: map[string]*schema.Schema{ + "name": &schema.Schema{ + Type: schema.TypeString, + ForceNew: true, + Required: true, + }, + "release_label": &schema.Schema{ + Type: schema.TypeString, + ForceNew: true, + Required: true, + }, + "master_instance_type": &schema.Schema{ + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "core_instance_type": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + ForceNew: true, + Computed: true, + }, + "core_instance_count": &schema.Schema{ + Type: schema.TypeInt, + Optional: true, + Default: 0, + }, + "cluster_state": &schema.Schema{ + Type: schema.TypeString, + Computed: true, + }, + "log_uri": &schema.Schema{ + Type: schema.TypeString, + ForceNew: true, + Optional: true, + }, + "master_public_dns": &schema.Schema{ + Type: schema.TypeString, + Computed: true, + }, + "applications": &schema.Schema{ + Type: schema.TypeSet, + Optional: true, + ForceNew: true, + Elem: &schema.Schema{Type: schema.TypeString}, + Set: schema.HashString, + }, + "ec2_attributes": &schema.Schema{ + Type: schema.TypeList, + MaxItems: 1, + Optional: true, + ForceNew: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "key_name": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "subnet_id": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "additional_master_security_groups": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "additional_slave_security_groups": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "emr_managed_master_security_group": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "emr_managed_slave_security_group": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "instance_profile": &schema.Schema{ + Type: schema.TypeString, + Required: true, + }, + }, + }, + }, + "bootstrap_action": &schema.Schema{ + Type: schema.TypeSet, + Optional: true, + ForceNew: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "name": &schema.Schema{ + Type: schema.TypeString, + Required: true, + }, + "path": &schema.Schema{ + Type: schema.TypeString, + Required: true, + }, + "args": &schema.Schema{ + Type: schema.TypeSet, + Optional: true, + Elem: &schema.Schema{Type: schema.TypeString}, + Set: schema.HashString, + }, + }, + }, + }, + "tags": tagsSchema(), + "configurations": &schema.Schema{ + Type: schema.TypeString, + ForceNew: true, + Optional: true, + }, + "service_role": &schema.Schema{ + Type: schema.TypeString, + ForceNew: true, + Required: true, + }, + "visible_to_all_users": &schema.Schema{ + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + Default: true, + }, + }, + } +} + +func resourceAwsEMRClusterCreate(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).emrconn + + log.Printf("[DEBUG] Creating EMR cluster") + masterInstanceType := d.Get("master_instance_type").(string) + coreInstanceType := masterInstanceType + if v, ok := d.GetOk("core_instance_type"); ok { + coreInstanceType = v.(string) + } + coreInstanceCount := d.Get("core_instance_count").(int) + + applications := d.Get("applications").(*schema.Set).List() + + instanceConfig := &emr.JobFlowInstancesConfig{ + MasterInstanceType: aws.String(masterInstanceType), + SlaveInstanceType: aws.String(coreInstanceType), + InstanceCount: aws.Int64(int64(coreInstanceCount)), + // Default values that we can open up in the future + KeepJobFlowAliveWhenNoSteps: aws.Bool(true), + TerminationProtected: aws.Bool(false), + } + + var instanceProfile string + if a, ok := d.GetOk("ec2_attributes"); ok { + ec2Attributes := a.([]interface{}) + attributes := ec2Attributes[0].(map[string]interface{}) + + if v, ok := attributes["key_name"]; ok { + instanceConfig.Ec2KeyName = aws.String(v.(string)) + } + if v, ok := attributes["subnet_id"]; ok { + instanceConfig.Ec2SubnetId = aws.String(v.(string)) + } + if v, ok := attributes["subnet_id"]; ok { + instanceConfig.Ec2SubnetId = aws.String(v.(string)) + } + + if v, ok := attributes["additional_master_security_groups"]; ok { + strSlice := strings.Split(v.(string), ",") + for i, s := range strSlice { + strSlice[i] = strings.TrimSpace(s) + } + instanceConfig.AdditionalMasterSecurityGroups = aws.StringSlice(strSlice) + } + + if v, ok := attributes["additional_slave_security_groups"]; ok { + strSlice := strings.Split(v.(string), ",") + for i, s := range strSlice { + strSlice[i] = strings.TrimSpace(s) + } + instanceConfig.AdditionalSlaveSecurityGroups = aws.StringSlice(strSlice) + } + + if v, ok := attributes["emr_managed_master_security_group"]; ok { + instanceConfig.EmrManagedMasterSecurityGroup = aws.String(v.(string)) + } + if v, ok := attributes["emr_managed_slave_security_group"]; ok { + instanceConfig.EmrManagedSlaveSecurityGroup = aws.String(v.(string)) + } + + if len(strings.TrimSpace(attributes["instance_profile"].(string))) != 0 { + instanceProfile = strings.TrimSpace(attributes["instance_profile"].(string)) + } + } + + emrApps := expandApplications(applications) + + params := &emr.RunJobFlowInput{ + Instances: instanceConfig, + Name: aws.String(d.Get("name").(string)), + Applications: emrApps, + + ReleaseLabel: aws.String(d.Get("release_label").(string)), + ServiceRole: aws.String(d.Get("service_role").(string)), + VisibleToAllUsers: aws.Bool(d.Get("visible_to_all_users").(bool)), + } + + if v, ok := d.GetOk("log_uri"); ok { + params.LogUri = aws.String(v.(string)) + } + + if instanceProfile != "" { + params.JobFlowRole = aws.String(instanceProfile) + } + + if v, ok := d.GetOk("bootstrap_action"); ok { + bootstrapActions := v.(*schema.Set).List() + params.BootstrapActions = expandBootstrapActions(bootstrapActions) + } + if v, ok := d.GetOk("tags"); ok { + tagsIn := v.(map[string]interface{}) + params.Tags = expandTags(tagsIn) + } + if v, ok := d.GetOk("configurations"); ok { + confUrl := v.(string) + params.Configurations = expandConfigures(confUrl) + } + + log.Printf("[DEBUG] EMR Cluster create options: %s", params) + resp, err := conn.RunJobFlow(params) + + if err != nil { + log.Printf("[ERROR] %s", err) + return err + } + + d.SetId(*resp.JobFlowId) + + log.Println( + "[INFO] Waiting for EMR Cluster to be available") + + stateConf := &resource.StateChangeConf{ + Pending: []string{"STARTING", "BOOTSTRAPPING"}, + Target: []string{"WAITING", "RUNNING"}, + Refresh: resourceAwsEMRClusterStateRefreshFunc(d, meta), + Timeout: 40 * time.Minute, + MinTimeout: 10 * time.Second, + Delay: 30 * time.Second, // Wait 30 secs before starting + } + + _, err = stateConf.WaitForState() + if err != nil { + return fmt.Errorf("[WARN] Error waiting for EMR Cluster state to be \"WAITING\" or \"RUNNING\": %s", err) + } + + return resourceAwsEMRClusterRead(d, meta) +} + +func resourceAwsEMRClusterRead(d *schema.ResourceData, meta interface{}) error { + emrconn := meta.(*AWSClient).emrconn + + req := &emr.DescribeClusterInput{ + ClusterId: aws.String(d.Id()), + } + + resp, err := emrconn.DescribeCluster(req) + if err != nil { + return fmt.Errorf("Error reading EMR cluster: %s", err) + } + + if resp.Cluster == nil { + log.Printf("[DEBUG] EMR Cluster (%s) not found", d.Id()) + d.SetId("") + return nil + } + + cluster := resp.Cluster + + if cluster.Status != nil { + if *cluster.Status.State == "TERMINATED" { + log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED already", d.Id()) + d.SetId("") + return nil + } + + if *cluster.Status.State == "TERMINATED_WITH_ERRORS" { + log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED_WITH_ERRORS already", d.Id()) + d.SetId("") + return nil + } + + d.Set("cluster_state", cluster.Status.State) + } + + instanceGroups, err := fetchAllEMRInstanceGroups(meta, d.Id()) + if err == nil { + coreGroup := findGroup(instanceGroups, "CORE") + if coreGroup != nil { + d.Set("core_instance_type", coreGroup.InstanceType) + } + } + + d.Set("name", cluster.Name) + d.Set("service_role", cluster.ServiceRole) + d.Set("release_label", cluster.ReleaseLabel) + d.Set("log_uri", cluster.LogUri) + d.Set("master_public_dns", cluster.MasterPublicDnsName) + d.Set("visible_to_all_users", cluster.VisibleToAllUsers) + d.Set("tags", tagsToMapEMR(cluster.Tags)) + + if err := d.Set("applications", flattenApplications(cluster.Applications)); err != nil { + log.Printf("[ERR] Error setting EMR Applications for cluster (%s): %s", d.Id(), err) + } + + // Configurations is a JSON document. It's built with an expand method but a + // simple string should be returned as JSON + if err := d.Set("configurations", cluster.Configurations); err != nil { + log.Printf("[ERR] Error setting EMR configurations for cluster (%s): %s", d.Id(), err) + } + + if err := d.Set("ec2_attributes", flattenEc2Attributes(cluster.Ec2InstanceAttributes)); err != nil { + log.Printf("[ERR] Error setting EMR Ec2 Attributes: %s", err) + } + return nil +} + +func resourceAwsEMRClusterUpdate(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).emrconn + + if d.HasChange("core_instance_count") { + log.Printf("[DEBUG] Modify EMR cluster") + groups, err := fetchAllEMRInstanceGroups(meta, d.Id()) + if err != nil { + log.Printf("[DEBUG] Error finding all instance groups: %s", err) + return err + } + + coreInstanceCount := d.Get("core_instance_count").(int) + coreGroup := findGroup(groups, "CORE") + if coreGroup == nil { + return fmt.Errorf("[ERR] Error finding core group") + } + + params := &emr.ModifyInstanceGroupsInput{ + InstanceGroups: []*emr.InstanceGroupModifyConfig{ + { + InstanceGroupId: coreGroup.Id, + InstanceCount: aws.Int64(int64(coreInstanceCount)), + }, + }, + } + _, errModify := conn.ModifyInstanceGroups(params) + if errModify != nil { + log.Printf("[ERROR] %s", errModify) + return errModify + } + + log.Printf("[DEBUG] Modify EMR Cluster done...") + } + + log.Println( + "[INFO] Waiting for EMR Cluster to be available") + + stateConf := &resource.StateChangeConf{ + Pending: []string{"STARTING", "BOOTSTRAPPING"}, + Target: []string{"WAITING", "RUNNING"}, + Refresh: resourceAwsEMRClusterStateRefreshFunc(d, meta), + Timeout: 40 * time.Minute, + MinTimeout: 10 * time.Second, + Delay: 5 * time.Second, + } + + _, err := stateConf.WaitForState() + if err != nil { + return fmt.Errorf("[WARN] Error waiting for EMR Cluster state to be \"WAITING\" or \"RUNNING\" after modification: %s", err) + } + + return resourceAwsEMRClusterRead(d, meta) +} + +func resourceAwsEMRClusterDelete(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).emrconn + + req := &emr.TerminateJobFlowsInput{ + JobFlowIds: []*string{ + aws.String(d.Id()), + }, + } + + _, err := conn.TerminateJobFlows(req) + if err != nil { + log.Printf("[ERROR], %s", err) + return err + } + + err = resource.Retry(10*time.Minute, func() *resource.RetryError { + resp, err := conn.ListInstances(&emr.ListInstancesInput{ + ClusterId: aws.String(d.Id()), + }) + + if err != nil { + return resource.NonRetryableError(err) + } + + instanceCount := len(resp.Instances) + + if resp == nil || instanceCount == 0 { + log.Printf("[DEBUG] No instances found for EMR Cluster (%s)", d.Id()) + return nil + } + + // Collect instance status states, wait for all instances to be terminated + // before moving on + var terminated []string + for j, i := range resp.Instances { + if i.Status != nil { + if *i.Status.State == "TERMINATED" { + terminated = append(terminated, *i.Ec2InstanceId) + } + } else { + log.Printf("[DEBUG] Cluster instance (%d : %s) has no status", j, *i.Ec2InstanceId) + } + } + if len(terminated) == instanceCount { + log.Printf("[DEBUG] All (%d) EMR Cluster (%s) Instances terminated", instanceCount, d.Id()) + return nil + } + return resource.RetryableError(fmt.Errorf("[DEBUG] EMR Cluster (%s) has (%d) Instances remaining, retrying", d.Id(), len(resp.Instances))) + }) + + if err != nil { + log.Printf("[ERR] Error waiting for EMR Cluster (%s) Instances to drain", d.Id()) + } + + d.SetId("") + return nil +} + +func expandApplications(apps []interface{}) []*emr.Application { + appOut := make([]*emr.Application, 0, len(apps)) + + for _, appName := range expandStringList(apps) { + app := &emr.Application{ + Name: appName, + } + appOut = append(appOut, app) + } + return appOut +} + +func flattenApplications(apps []*emr.Application) []interface{} { + appOut := make([]interface{}, 0, len(apps)) + + for _, app := range apps { + appOut = append(appOut, *app.Name) + } + return appOut +} + +func flattenEc2Attributes(ia *emr.Ec2InstanceAttributes) []map[string]interface{} { + attrs := map[string]interface{}{} + result := make([]map[string]interface{}, 0) + + if ia.Ec2KeyName != nil { + attrs["key_name"] = *ia.Ec2KeyName + } + if ia.Ec2SubnetId != nil { + attrs["subnet_id"] = *ia.Ec2SubnetId + } + if ia.IamInstanceProfile != nil { + attrs["instance_profile"] = *ia.IamInstanceProfile + } + if ia.EmrManagedMasterSecurityGroup != nil { + attrs["emr_managed_master_security_group"] = *ia.EmrManagedMasterSecurityGroup + } + if ia.EmrManagedSlaveSecurityGroup != nil { + attrs["emr_managed_slave_security_group"] = *ia.EmrManagedSlaveSecurityGroup + } + + if len(ia.AdditionalMasterSecurityGroups) > 0 { + strs := aws.StringValueSlice(ia.AdditionalMasterSecurityGroups) + attrs["additional_master_security_groups"] = strings.Join(strs, ",") + } + if len(ia.AdditionalSlaveSecurityGroups) > 0 { + strs := aws.StringValueSlice(ia.AdditionalSlaveSecurityGroups) + attrs["additional_slave_security_groups"] = strings.Join(strs, ",") + } + + result = append(result, attrs) + + return result +} + +func loadGroups(d *schema.ResourceData, meta interface{}) ([]*emr.InstanceGroup, error) { + emrconn := meta.(*AWSClient).emrconn + reqGrps := &emr.ListInstanceGroupsInput{ + ClusterId: aws.String(d.Id()), + } + + respGrps, errGrps := emrconn.ListInstanceGroups(reqGrps) + if errGrps != nil { + return nil, fmt.Errorf("Error reading EMR cluster: %s", errGrps) + } + return respGrps.InstanceGroups, nil +} + +func findGroup(grps []*emr.InstanceGroup, typ string) *emr.InstanceGroup { + for _, grp := range grps { + if grp.InstanceGroupType != nil { + if *grp.InstanceGroupType == typ { + return grp + } + } + } + return nil +} + +func expandTags(m map[string]interface{}) []*emr.Tag { + var result []*emr.Tag + for k, v := range m { + result = append(result, &emr.Tag{ + Key: aws.String(k), + Value: aws.String(v.(string)), + }) + } + + return result +} + +func tagsToMapEMR(ts []*emr.Tag) map[string]string { + result := make(map[string]string) + for _, t := range ts { + result[*t.Key] = *t.Value + } + + return result +} + +func expandBootstrapActions(bootstrapActions []interface{}) []*emr.BootstrapActionConfig { + actionsOut := []*emr.BootstrapActionConfig{} + + for _, raw := range bootstrapActions { + actionAttributes := raw.(map[string]interface{}) + actionName := actionAttributes["name"].(string) + actionPath := actionAttributes["path"].(string) + actionArgs := actionAttributes["args"].(*schema.Set).List() + + action := &emr.BootstrapActionConfig{ + Name: aws.String(actionName), + ScriptBootstrapAction: &emr.ScriptBootstrapActionConfig{ + Path: aws.String(actionPath), + Args: expandStringList(actionArgs), + }, + } + actionsOut = append(actionsOut, action) + } + + return actionsOut +} + +func expandConfigures(input string) []*emr.Configuration { + configsOut := []*emr.Configuration{} + if strings.HasPrefix(input, "http") { + if err := readHttpJson(input, &configsOut); err != nil { + log.Printf("[ERR] Error reading HTTP JSON: %s", err) + } + } else if strings.HasSuffix(input, ".json") { + if err := readLocalJson(input, &configsOut); err != nil { + log.Printf("[ERR] Error reading local JSON: %s", err) + } + } else { + if err := readBodyJson(input, &configsOut); err != nil { + log.Printf("[ERR] Error reading body JSON: %s", err) + } + } + log.Printf("[DEBUG] Expanded EMR Configurations %s", configsOut) + + return configsOut +} + +func readHttpJson(url string, target interface{}) error { + r, err := http.Get(url) + if err != nil { + return err + } + defer r.Body.Close() + + return json.NewDecoder(r.Body).Decode(target) +} + +func readLocalJson(localFile string, target interface{}) error { + file, e := ioutil.ReadFile(localFile) + if e != nil { + log.Printf("[ERROR] %s", e) + return e + } + + return json.Unmarshal(file, target) +} + +func readBodyJson(body string, target interface{}) error { + log.Printf("[DEBUG] Raw Body %s\n", body) + err := json.Unmarshal([]byte(body), target) + if err != nil { + log.Printf("[ERROR] parsing JSON %s", err) + return err + } + return nil +} + +func resourceAwsEMRClusterStateRefreshFunc(d *schema.ResourceData, meta interface{}) resource.StateRefreshFunc { + return func() (interface{}, string, error) { + conn := meta.(*AWSClient).emrconn + + log.Printf("[INFO] Reading EMR Cluster Information: %s", d.Id()) + params := &emr.DescribeClusterInput{ + ClusterId: aws.String(d.Id()), + } + + resp, err := conn.DescribeCluster(params) + + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + if "ClusterNotFound" == awsErr.Code() { + return 42, "destroyed", nil + } + } + log.Printf("[WARN] Error on retrieving EMR Cluster (%s) when waiting: %s", d.Id(), err) + return nil, "", err + } + + emrc := resp.Cluster + + if emrc == nil { + return 42, "destroyed", nil + } + + if resp.Cluster.Status != nil { + log.Printf("[DEBUG] EMR Cluster status (%s): %s", d.Id(), *resp.Cluster.Status) + } + + return emrc, *emrc.Status.State, nil + } +} diff --git a/builtin/providers/aws/resource_aws_emr_cluster_test.go b/builtin/providers/aws/resource_aws_emr_cluster_test.go new file mode 100644 index 0000000000..a871d53e89 --- /dev/null +++ b/builtin/providers/aws/resource_aws_emr_cluster_test.go @@ -0,0 +1,373 @@ +package aws + +import ( + "fmt" + "log" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/emr" + "github.com/hashicorp/terraform/helper/acctest" + "github.com/hashicorp/terraform/helper/resource" + "github.com/hashicorp/terraform/terraform" +) + +func TestAccAWSEMRCluster_basic(t *testing.T) { + var jobFlow emr.RunJobFlowOutput + r := acctest.RandInt() + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckAWSEmrDestroy, + Steps: []resource.TestStep{ + resource.TestStep{ + Config: testAccAWSEmrClusterConfig(r), + Check: testAccCheckAWSEmrClusterExists("aws_emr_cluster.tf-test-cluster", &jobFlow), + }, + }, + }) +} + +func testAccCheckAWSEmrDestroy(s *terraform.State) error { + conn := testAccProvider.Meta().(*AWSClient).emrconn + + for _, rs := range s.RootModule().Resources { + if rs.Type != "aws_emr_cluster" { + continue + } + + params := &emr.DescribeClusterInput{ + ClusterId: aws.String(rs.Primary.ID), + } + + describe, err := conn.DescribeCluster(params) + + if err == nil { + if describe.Cluster != nil && + *describe.Cluster.Status.State == "WAITING" { + return fmt.Errorf("EMR Cluster still exists") + } + } + + providerErr, ok := err.(awserr.Error) + if !ok { + return err + } + + log.Printf("[ERROR] %v", providerErr) + } + + return nil +} + +func testAccCheckAWSEmrClusterExists(n string, v *emr.RunJobFlowOutput) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[n] + if !ok { + return fmt.Errorf("Not found: %s", n) + } + if rs.Primary.ID == "" { + return fmt.Errorf("No cluster id set") + } + conn := testAccProvider.Meta().(*AWSClient).emrconn + describe, err := conn.DescribeCluster(&emr.DescribeClusterInput{ + ClusterId: aws.String(rs.Primary.ID), + }) + if err != nil { + return fmt.Errorf("EMR error: %v", err) + } + + if describe.Cluster != nil && + *describe.Cluster.Id != rs.Primary.ID { + return fmt.Errorf("EMR cluser not found") + } + + if describe.Cluster != nil && + *describe.Cluster.Status.State != "WAITING" { + return fmt.Errorf("EMR cluser is not up yet") + } + + return nil + } +} + +func testAccAWSEmrClusterConfig(r int) string { + return fmt.Sprintf(` +provider "aws" { + region = "us-west-2" +} + +resource "aws_emr_cluster" "tf-test-cluster" { + name = "emr-test-%d" + release_label = "emr-4.6.0" + applications = ["Spark"] + + ec2_attributes { + subnet_id = "${aws_subnet.main.id}" + emr_managed_master_security_group = "${aws_security_group.allow_all.id}" + emr_managed_slave_security_group = "${aws_security_group.allow_all.id}" + instance_profile = "${aws_iam_instance_profile.emr_profile.arn}" + } + + master_instance_type = "m3.xlarge" + core_instance_type = "m3.xlarge" + core_instance_count = 1 + + tags { + role = "rolename" + dns_zone = "env_zone" + env = "env" + name = "name-env" + } + + bootstrap_action { + path = "s3://elasticmapreduce/bootstrap-actions/run-if" + name = "runif" + args = ["instance.isMaster=true", "echo running on master node"] + } + + configurations = "test-fixtures/emr_configurations.json" + + depends_on = ["aws_main_route_table_association.a"] + + service_role = "${aws_iam_role.iam_emr_default_role.arn}" +} + +resource "aws_security_group" "allow_all" { + name = "allow_all" + description = "Allow all inbound traffic" + vpc_id = "${aws_vpc.main.id}" + + ingress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + + depends_on = ["aws_subnet.main"] + + lifecycle { + ignore_changes = ["ingress", "egress"] + } + + tags { + name = "emr_test" + } +} + +resource "aws_vpc" "main" { + cidr_block = "168.31.0.0/16" + enable_dns_hostnames = true + + tags { + name = "emr_test" + } +} + +resource "aws_subnet" "main" { + vpc_id = "${aws_vpc.main.id}" + cidr_block = "168.31.0.0/20" + + tags { + name = "emr_test" + } +} + +resource "aws_internet_gateway" "gw" { + vpc_id = "${aws_vpc.main.id}" +} + +resource "aws_route_table" "r" { + vpc_id = "${aws_vpc.main.id}" + + route { + cidr_block = "0.0.0.0/0" + gateway_id = "${aws_internet_gateway.gw.id}" + } +} + +resource "aws_main_route_table_association" "a" { + vpc_id = "${aws_vpc.main.id}" + route_table_id = "${aws_route_table.r.id}" +} + +### + +# IAM things + +### + +# IAM role for EMR Service +resource "aws_iam_role" "iam_emr_default_role" { + name = "iam_emr_default_role_%d" + + assume_role_policy = < **NOTE:** At this time, Instance Groups cannot be destroyed through the API nor +web interface. Instance Groups are destroyed when the EMR Cluster is destroyed. +Terraform will resize any Instance Group to zero when destroying the resource. + +## Example Usage + +``` +resource "aws_emr_cluster_instance_group" "task" { + cluster_id = "${aws_emr_cluster.tf-test-cluster.id}" + instance_count = 1 + instance_type = "m3.xlarge" + name = "my little instance group" +} +``` + +## Argument Reference + +The following arguments are supported: + +* `name` - (Optional) Optional human friendly name for this Instance Group +* `cluster_id` - (Required) ID of the EMR Cluster to attach to +* `instance_type` - (Required) Type of instances for this Group +* `instance_count` - (Optional) Count of instances to launch + + + +## ec2\_attributes + +Attributes for the Instance Group + +* `name` - Human friendly name for this Instance Group +* `cluster_id` - ID of the EMR Cluster the group is attached to +* `instance_type` - Type of instances for this Group +* `instance_count` - Count of desired instances to launch +* `running_instance_count` - Count of actual running instances in the group +* `status` - State of the instance group. One of `PROVISIONING`, `BOOTSTRAPPING`, `RUNNING`, `RESIZING`, `SUSPENDED`, `TERMINATING`, `TERMINATED`, `ARRESTED`, `SHUTTING_DOWN`, `ENDED` diff --git a/website/source/layouts/aws.erb b/website/source/layouts/aws.erb index 12451035d4..07e3bcad04 100644 --- a/website/source/layouts/aws.erb +++ b/website/source/layouts/aws.erb @@ -457,6 +457,19 @@ + > + Elastic Map Reduce Resources + + + > ElasticSearch Resources