Giter VIP home page Giter VIP logo

cloudtrail-parquet-glue's People

Contributors

alsmola avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

cloudtrail-parquet-glue's Issues

non-queryable raw logs, inconsistent results with same input, duplicate results

I've tried setting this up to compare it against an existing non-Glue-based solution, and am running into a couple of issues I'm hoping someone can help with.
Disclaimer: I have little previous experience with Glue Crawlers or Glue ETL - it's possible I'm making a simple (or more than one) mistake.

I have been able to reproduce the behavior by resetting the environment (destroy terraform, empty buckets, delete glue database/tables) and following the process below:

  • Created 4 buckets:
    xxxx-cloudtrail-glue-testing, for cloudtrail_s3_bucket
    xxxx-cloudtrail-glue-testing-scripts, for etl_script_s3_bucket
    xxxx-cloudtrail-glue-testing-output, for parquet output
    xxxx-cloudtrail-glue-testing-tmp, for parquet_s3_bucket
  • Configured and applied terraform module with terraform 0.12.29 / aws provider 3.28.0, no issues observed, crawlers/etl/workflow created as expected.
  • Enabled an organization cloudtrail for multiple accounts for 45 minutes, shipping management cloudtrail events to the xxxx-cloudtrail-glue-testing bucket with no additional prefix configured (I had a whole different set of issues when I tried a POC with s3 data events and using prefixes), to make sure I was working with a fixed number of events sent with a default configuration.
  • Ran Workflow from Glue console, indicated workflow completed successfully
    • 2 glue tables were created:
      cloudtrail_parquet
      raw_xxxx_cloudtrail_glue_testing
  • Attempted to validate results by comparing number of raw events vs number of events in parquet table:
    select count(*) from cloudtrail_parquet returns 186949
    select count(*) from raw_tmg_cloudtrail_glue_testing unqueryable, returns the following error:
HIVE_METASTORE_ERROR: com.facebook.presto.spi.PrestoException: Error: : expected at the position 2264 of 'struct<securityGroupSet:string,securityGroupIdSet:struct<items:array<struct<groupId:string>>>,filterSet:struct<items:array<struct<name:string,valueSet:struct<items:array<struct<value:string>>>>>>,lookupAttributes:array<struct<attributeKey:string,attributeValue:string>>,startTime:string,endTime:string,maxResults:string,roleArn:string,roleSessionName:string,includeShared:boolean,includePublic:boolean,subnetSet:struct<items:array<struct<subnetId:string>>>,publicIpsSet:string,allocationIdsSet:struct<items:array<struct<allocationId:string>>>,customerGatewaySet:string,networkAclIdSet:string,networkInterfaceIdSet:struct<items:array<struct<networkInterfaceId:string>>>,volumeSet:struct<items:array<struct<volumeId:string>>>,DescribeHostsRequest:struct<MaxResults:int>,vpcSet:struct<items:array<struct<vpcId:string>>>,instancesSet:struct<items:array<struct<instanceId:string,imageId:string,minCount:int,maxCount:int,keyName:string>>>,vpnGatewaySet:string,internetGatewayIdSet:string,vpnConnectionSet:string,routeTableIdSet:string,Host:string,DescribeVpcEndpointsRequest:string,DescribeNatGatewaysRequest:string,DescribeEgressOnlyInternetGatewaysRequest:string,vpcPeeringConnectionIdSet:string,externalId:string,targetGroupArn:string,groupName:string,maxItems:string,durationSeconds:int,regionSet:string,resourceArns:array<string>,pageSize:int,includeAllInstances:boolean,functionVersion:string,name:string,s3BucketName:string,includeGlobalServiceEvents:boolean,isMultiRegionTrail:boolean,enableLogFileValidation:boolean,isOrganizationTrail:boolean,bucketName:string,location:string,certificateArn:string,includes:struct<hasDnsFqdn:boolean,keyTypes:array<string>>,trailNameList:array<string>,resource:string,masterRegion:string,userName:string,functionName:string,resourceName:string,tagging:string,acl:string,website:string,replication:string,logging:string,policy:string,versioning:string,encryption:string,publicAccessBlock:string,lifecycle:string,accessKeyId:string,trailName:string,resourceIdList:array<string>,includeShadowTrails:boolean,logGroupName:string,accelerate:string,object-lock:string,notification:string,cors:string,requestPayment:string,DescribeVpcEndpointServiceConfigurationsRequest:string,keySpec:string,keyId:string,encryptionContext:struct<aws\:cloudtrail\:arn:string,aws\:s3\:arn:string,aws\:acm\:arn:string,aws\:cloudfront\:arn:string,aws\:lambda\:FunctionArn:string,aws\:elasticloadbalancing\:arn:string,aws\:codecommit\:env-alg:string,aws\:codecommit\:sig-alg:string,aws\:codecommit\:id:string,glue_catalog_id:string,service:string,aws\:rds\:db-id:string,aws\:pi\:service:string,SecretARN:string,SecretVersionId:string,domainARN:string>,resourceArn:string,paginationToken:string,tagFilters:array<struct<key:string,values:array<string>>>,resourcesPerPage:int,resourceTypeFilters:array<string>,crawlerNames:array<string>,crawlerNameList:array<string>,logStreamName:string,aggregateField:string,filter:struct<eventStatusCodes:array<string>,startTimes:array<struct<from:string>>>,jobName:string,runId:string,includeGraph:boolean,roleName:string,policyArn:string,accountAttributeNameSet:struct<items:array<struct<attributeName:string>>>,versionId:string,hidePassword:boolean,bucket:string,policyStatus:string,catalogId:string,databaseName:string,tablesToDelete:array<string>,DescribeByoipCidrsRequest:struct<MaxResults:int>,maxRecords:int,tableName:string,targetGroupArns:array<string>,targets:array<struct<id:string,port:int,availabilityZone:string,arn:string>>,dhcpOptionsSet:struct<items:array<struct<dhcpOptionsId:string>>>,subnetId:string,description:string,groupSet:struct<items:array<struct<groupId:string>>>,privateIpAddressesSet:string,ipv6AddressCount:int,resourcesSet:struct<items:array<struct<resourceId:string>>>,tagSet:struct<items:array<struct<key:string,value:string>>>,autoScalingGroupNames:array<string>,networkInterfaceId:string,nextToken:string,encryptionAlgorithm:string,serviceCode:string,marker:string,registryId:string,repositoryName:string,layerDigest:string,cluster:string,services:array<string>,imageIds:array<struct<imageDigest:string,imageTag:string>>,acceptedMediaTypes:array<string>,registryIds:array<string>,containerInstance:string,secretId:string,policyName:string,reservedInstancesSet:string,instanceId:string,agentVersion:string,agentStatus:string,platformType:string,platformName:string,platformVersion:string,iPAddress:string,computerName:string,agentName:string,jobQueue:string,parameters:struct<user_ids:string,dates:string,write_to_feature_store:string,remove_old_data:string,executionTimeout:array<string>,commands:array<string>>,jobDefinition:string,serviceNamespace:string,resourceIds:array<string>,commitId:string,branchName:string,rule:string,eventPattern:string,stateMachineArn:string,tags:array<struct<value:string,key:string>>,definition:string,streamName:string,limit:double,capacityProviderStrategy:array<struct<capacityProvider:string,weight:double,base:double>>,count:double,enableECSManagedTags:boolean,enableExecuteCommand:boolean,networkConfiguration:struct<awsvpcConfiguration:struct<assignPublicIp:string,securityGroups:array<string>,subnets:array<string>>>,overrides:struct<containerOverrides:array<struct<name:string,command:array<string>,environment:array<struct<name:string,value:string>>>>,cpu:string,memory:string>,taskDefinition:string,partitionValues:array<string>,stackStatusFilter:array<string>,tasks:array<string>,reservedInstancesModificationSet:string,loadBalancerNames:array<string>,GroupName:string,Filters:array<struct<Name:string,Values:array<string>>>,attribute:string,image:string,attributes:array<string>,exclusiveStartShardId:string,streamArn:string,showCacheNodeInfo:boolean,showCacheClustersNotInReplicationGroups:boolean,userData:string,instanceType:string,blockDeviceMapping:struct<items:array<struct<deviceName:string,ebs:struct<volumeSize:int,deleteOnTermination:boolean,volumeType:string>>>>,availabilityZone:string,monitoring:struct<enabled:boolean>,disableApiTermination:boolean,clientToken:string,iamInstanceProfile:struct<name:string>,tagSpecificationSet:struct<items:array<struct<resourceType:string,tags:array<struct<key:string,value:string>>>>>,instanceMarketOptions:struct<marketType:string,spotOptions:struct<maxPrice:string,spotInstanceType:string>>,hostedZoneId:string,startRecordName:string,startRecordType:string,changeBatch:struct<changes:array<struct<action:string,resourceRecordSet:struct<name:string,type:string,tTL:int,resourceRecords:array<struct<value:string>>>>>>,startRecordIdentifier:string,dNSName:string,destinationKeyId:string,destinationEncryptionAlgorithm:string,sourceEncryptionAlgorithm:string,sourceAAD:string,sourceEncryptionContext:struct<aws\:acm\:arn:string>,destinationAAD:string,destinationEncryptionContext:struct<aws\:elasticloadbalancing\:arn:string,aws\:acm\:arn:string>,instanceIdentityDocument:string,instanceIdentityDocumentSignature:string,totalResources:array<struct<name:string,type:string,doubleValue:double,longValue:double,integerValue:double,stringSetValue:array<string>>>,task:string,status:string,reason:string,containers:array<struct<containerName:string,runtimeId:string,networkBindings:array<struct<bindIP:string,containerPort:double,hostPort:double,protocol:string>>,status:string,exitCode:double,reason:string>>,pullStartedAt:string,pullStoppedAt:string,instanceIds:array<string>,documentName:string,timeoutSeconds:double,comment:string,outputS3BucketName:string,outputS3KeyPrefix:string,interactive:boolean,commandId:string,details:boolean,items:array<struct<typeName:string,schemaVersion:string,captureTime:string,contentHash:string,id:string,title:string,severity:string,status:string,details:struct<DocumentVersion:string,DocumentName:string>>>,associationId:string,executionResult:struct<executionDate:string,status:string,executionSummary:string,errorCode:string>,resourceId:string,resourceType:string,complianceType:string,executionSummary:struct<executionTime:string,executionId:string,executionType:string>,itemContentHash:string,executionStoppedAt:string,instances:array<string>,loadBalancerName:string,dBInstanceIdentifier:string,dBSnapshotIdentifier:string,clusterId:string,clusterStates:array<string>,launchConfigurationNames:array<string>,names:array<string>,autoScalingGroupName:string,targetGroupARNs:array<string>,loadBalancerArn:string,protocol:string,port:int,defaultActions:array<struct<targetGroupArn:string,type:string>>,securityGroups:array<string>,type:string,ipAddressType:string,subnetMappings:array<struct<subnetId:string>>,scheme:string,loadBalancerArns:array<string>,healthCheckPort:string,healthCheckPath:string,vpcId:string,healthCheckProtocol:string,cacheClusterId:string,pipelineId:string,input:struct<key:string,frameRate:string,resolution:string,aspectRatio:string,interlaced:string,container:string>,output:struct<key:string,thumbnailPattern:string,rotate:string,presetId:string,watermarks:array<struct<presetWatermarkId:string,inputKey:string>>,composition:array<struct<timeSpan:struct<startTime:string,duration:string>>>>,outputKeyPrefix:string,insightSelectors:array<string>,s3KeyPrefix:string,kmsKeyId:string,eventSelectors:array<struct<readWriteType:string,includeManagementEvents:boolean,dataResources:array<string>,excludeManagementEventSources:array<string>>>,availabilityZoneSet:string,availabilityZoneIdSet:string,spotInstanceRequestIdSet:struct<items:array<struct<spotInstanceRequestId:string>>>,expression:string,executableBySet:string,imagesSet:struct<items:array<struct<imageId:string>>>,ownersSet:string,auditContext:struct<additionalAuditContext:string>,segment:struct<segmentNumber:int,totalSegments:int>,queryExecutionId:string,queryString:string,clientRequestToken:string,queryExecutionContext:struct<database:string,catalog:string>,resultConfiguration:struct<outputLocation:string>,workGroup:string,tableInput:struct<partitionKeys:array<struct<name:string,type:string>>,lastAccessTime:string,storageDescriptor:struct<numberOfBuckets:int,location:string,storedAsSubDirectories:boolean,compressed:boolean,sortColumns:array<string>,columns:array<struct<name:string,type:string>>,outputFormat:string,parameters:string,skewedInfo:struct<skewedColumnValueLocationMaps:string,skewedColumnNames:array<string>,skewedColumnValues:array<string>>,serdeInfo:struct<parameters:struct<serialization.format:string>,serializationLibrary:string>,inputFormat:string,bucketColumns:array<string>>,tableType:string,name:string,isRowFilteringEnabled:boolean,parameters:struct<EXTERNAL:string,avro.schema.literal:string,parquet.compress:string,transient_lastDdlTime:string>,retention:int,owner:string>,instanceTypeSet:struct<items:array<struct<instanceType:string>>>,productDescriptionSet:struct<items:array<struct<productDescription:string>>>,tenancy:string,networkInterfaceSet:struct<items:array<struct<deviceIndex:int,subnetId:string,description:string,deleteOnTermination:boolean,associatePublicIpAddress:boolean,groupSet:struct<items:array<struct<groupId:string>>>>>>,queryExecutionIds:array<string>,partitionInputList:array<struct<storageDescriptor:struct<numberOfBuckets:int,location:string,storedAsSubDirectories:boolean,compressed:boolean,sortColumns:array<string>,columns:array<struct<name:string,type:string>>,outputFormat:string,parameters:string,skewedInfo:struct<skewedColumnValueLocationMaps:string,skewedColumnNames:array<string>,skewedColumnValues:array<string>>,serdeInfo:struct<parameters:struct<serialization.format:string>,serializationLibrary:string>,inputFormat:string,bucketColumns:array<string>>,values:array<string>,lastAccessTime:string>>>' but '\' is found. (Service: null; Status Code: 0; Error Code: null; Request ID: null; Proxy: null)
  • Needed raw data and not familiar with the error above, so created a table manually to get results:
CREATE EXTERNAL TABLE `manual_cloudtrail_glue_testing_raw`(
  `eventversion` string COMMENT 'from deserializer', 
  `useridentity` struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> COMMENT 'from deserializer', 
  `eventtime` string COMMENT 'from deserializer', 
  `eventsource` string COMMENT 'from deserializer', 
  `eventname` string COMMENT 'from deserializer', 
  `awsregion` string COMMENT 'from deserializer', 
  `sourceipaddress` string COMMENT 'from deserializer', 
  `useragent` string COMMENT 'from deserializer', 
  `errorcode` string COMMENT 'from deserializer', 
  `errormessage` string COMMENT 'from deserializer', 
  `requestparameters` string COMMENT 'from deserializer', 
  `responseelements` string COMMENT 'from deserializer', 
  `additionaleventdata` string COMMENT 'from deserializer', 
  `requestid` string COMMENT 'from deserializer', 
  `eventid` string COMMENT 'from deserializer', 
  `resources` array<struct<arn:string,accountid:string,type:string>> COMMENT 'from deserializer', 
  `eventtype` string COMMENT 'from deserializer', 
  `apiversion` string COMMENT 'from deserializer', 
  `readonly` string COMMENT 'from deserializer', 
  `recipientaccountid` string COMMENT 'from deserializer', 
  `serviceeventdetails` string COMMENT 'from deserializer', 
  `sharedeventid` string COMMENT 'from deserializer', 
  `vpcendpointid` string COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'com.amazon.emr.hive.serde.CloudTrailSerde' 
STORED AS INPUTFORMAT 
  'com.amazon.emr.cloudtrail.CloudTrailInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://xxxx-cloudtrail-glue-testing/'
TBLPROPERTIES (
  'transient_lastDdlTime'='1603852003')
  • select count(*) from manual_cloudtrail_glue_testing_raw returns 187073 events... 124 more than the 186949 events in the parquet table
  • Ran the workflow again manually from Glue console
  • select count(*) from cloudtrail_parquet now has 373908 records, which means 186959 records were added, even though no additional files were being written to the bucket
  • confirmed with additional queries that these were duplicate event records, but 10 records more were added than the first ETL job
  • Ran the workflow a third time manually form the Glue console
  • select count(*) from cloudtrail_parquet now has 560826 records - 186918 were added, again duplicate records but this time 31 records less than the first ETL job

It would require additional testing to be certain, but it appears (at least with static input, as I tested with), that subsequent workflow runs reprocess the same events and convert them to parquet again.
It also appears that, given static input, the output of each subsequent ETL run appears to be inconsistent (ie on each run, a different number of events were converted to parquet).

I'm curious to see if anyone has gotten this to work properly without modifications to the Crawlers or ETL job?
If so, were you able to query the raw log table? Did multiple runs on the same dataset produce duplicate results for you? Has anyone else tried to run a static dataset through this workflow multiple times and compare the output (parquet) to the input (raw) event data to validate the consistency of the ETL process?

Thanks in advance - looking forward to hopefully learning a little more about this!

Initialization help

In order to use this project, is it sufficient to create the 3 new S3 buckets and then fill in the values in the variables.tf file and then run terraform apply? That is what I did and am not having success. The terraform command ran successfully and output glue_workflow_id = CloudTrailParquetGlue. I can see I have an Athena database named cloudtrail and a table named after my S3 bucket that contains my original cloudtrail logs raw_CLOUDTRAIL_BUCKET (where CLOUDTRAIL_BUCKET is the name of my bucket). I have two Glue crawlers CloudTrailParquetCrawler and CloudTrailRawCrawler which I ran manually and they seem to be successful as their logs look like this:
image

I then ran the ETL job CloudTrailToParquet and got the following error message:

AnalysisException: u'Partition column `day` not found in schema

image

My parquet and temp S3 buckets are empty.

I am running this against an S3 bucket that I had copied CloudTrail logs to, but they retain their directory structure.

Question - Data retention and Bucket Lifecycle policies

Thanks for sharing this project and the extensive blog post behind it. I've got a few questions -

While I've manually setup athena to make adhoc queries against cloudtrail buckets on an as needed basis, we're considering our options for automating to have athena searchable on a daily basis across our cloudtrail logs, and your approach sounds interesting - even more appealing as we use terraform. I'm trying to catch up and learn about about AWS Glue as I have no direct experience with it nor with parquet format.

I'm curious how this solution is effected by the number of accounts/regions writing cloudtrail to the raw source bucket from the org and the number of days this data kept in the bucket before being purged by a lifecycle_rule. For example, what if we keep 6 months of log files for 30+ accounts with activity in 5 different regions. ( not exactly sure what our final retention policy will be... still working that out)

Does the crawler only crawl new objects uploaded since the last run, or does it need to crawl the entire bucket every day? What happens when objects expire via the lifecycle rule in the source bucket? Would the objects in the bucket holding the transformed parquet files get purged automatically, or would a lifecycle rule need to be written on the target bucket as well? Are the table partitions actively pruned to keep up with dates that have expired as a result of the lifecycle rule?

I will try to schedule some time in the coming week or two to try this out and that might shed light on my questions.

Solution does not scale for large existing AWS accounts.

I have a large existing AWS account with a few years of CloudTrail log data already in my source S3 bucket. When deploying this solution and manually running the first crawler it runs, but does not finish, at least in any reasonable amount of time. I ran the CloudTrailRawCrawler crawler for 24 hours and it didn't finish the first crawl of the source CloudTrail bucket. I suspect this is due to a few years worth of daily partitions and very large number of small existing CloudTrail log files. Not this source S3 bucket only contains the CloudTrail logs for one account that is 99% dominated by 1 AWS region. So, there aren't an unreasonable number of partitions to crawl.

Is there any way to speed up or parallelize the initial crawl?

Recrawl Policy

I've noticed that the glue crawlers are currently setup to crawl all folders each time it's ran. Shouldn't it be changed to only crawl new folders only?I have millions of files to crawl through and it takes forever. If it's only set to crawl the new folders, it should make the process much faster.

I believe if we implement the Recrawl Policy on the terraform, this would solve the issue.

No AWS resource tags.

To be really useful in a production account sense this module needs to support some form of tagging on the AWS resources created. I suggest something like a list of tags that get applied to each resource that gets created.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.