scanner
  • About Scanner
  • When to use it
  • Architecture
  • Getting Started
  • Playground Guide
    • Overview
    • Part 1: Search and Analysis
    • Part 2: Detection Rules
    • Wrapping Up
  • Log Data Sources
    • Overview
    • List
      • AWS
        • AWS Aurora
        • AWS CloudTrail
        • AWS CloudWatch
        • AWS ECS
        • AWS EKS
        • AWS GuardDuty
        • AWS Lambda
        • AWS Route53 Resolver
        • AWS VPC Flow
        • AWS VPC Transit Gateway Flow
        • AWS WAF
      • Cloudflare
        • Audit Logs
        • Firewall Events
        • HTTP Requests
        • Other Datasets
      • Crowdstrike
      • Custom via Fluentd
      • Fastly
      • GitHub
      • Jamf
      • Lacework
      • Osquery
      • OSSEC
      • Sophos
      • Sublime Security
      • Suricata
      • Syslog
      • Teleport
      • Windows Defender
      • Windows Sysmon
      • Zeek
  • Indexing Your Logs in S3
    • Linking AWS Accounts
      • Manual setup
        • AWS CloudShell
      • Infra-as-code
        • AWS CloudFormation
        • Terraform
        • Pulumi
    • Creating S3 Import Rules
      • Configuration - Basic
      • Configuration - Optional Transformations
      • Previewing Imports
      • Regular Expressions in Import Rules
  • Using Scanner
    • Query Syntax
    • Aggregation Functions
      • avg()
      • count()
      • countdistinct()
      • eval()
      • groupbycount()
      • max()
      • min()
      • percentile()
      • rename()
      • stats()
      • sum()
      • table()
      • var()
      • where()
    • Detection Rules
      • Event Sinks
      • Out-of-the-Box Detection Rules
      • MITRE Tags
    • API
      • Ad hoc queries
      • Detection Rules
      • Event Sinks
      • Validating YAML files
    • Built-in Indexes
      • _audit
    • Role-Based Access Control (RBAC)
    • Beta features
      • Scanner for Splunk
        • Getting Started
        • Using Scanner Search Commands
        • Dashboards
        • Creating Custom Content in Splunk Security Essentials
      • Scanner for Grafana
        • Getting Started
      • Jupyter Notebooks
        • Getting Started with Jupyter Notebooks
        • Scanner Notebooks on Github
      • Detection Rules as Code
        • Getting Started
        • Writing Detection Rules
        • CLI
        • Managing Synced Detection Rules
      • Detection Alert Formatting
        • Customizing PagerDuty Alerts
      • Scalar Functions and Operators
        • coalesce()
        • if()
        • arr.join()
        • math.abs()
        • math.round()
        • str.uriencode()
  • Single Sign On (SSO)
    • Overview
    • Okta
      • Okta Workforce
      • SAML
  • Self-Hosted Scanner
    • Overview
Powered by GitBook
On this page
  • Normalize to ECS (Elastic Common Schema)
  • Extract by Regex
  • Extract Timestamp
  • Parse JSON Columns
  • Parse Key-Value Columns
  • Unroll Array

Was this helpful?

  1. Indexing Your Logs in S3
  2. Creating S3 Import Rules

Configuration - Optional Transformations

Scanner can transform your logs during ingestion. Below are the types of transformations you can configure for your import rules.

Note that all data added by transformations will count against your ingestion volume.

Normalize to ECS (Elastic Common Schema)

Add normalized ECS (Elastic Common Schema) fields to the log events.

Parameters

  • Log Source: One of the 12 log sources for which Scanner provides out-of-the-box normalization.

Example

// Parameters
// Log source: "CloudTrail"

// Input log event
{
    "eventName": "CreateBucket",
    "awsRegion": "us-east-1",
    "recipientAccountId": "123456789012",
    "eventSource": "s3.amazonaws.com",
    "requestID": "request-1234567890",
    "sourceIPAddress": "192.168.1.1",
    "userAgent": "aws-cli/2.2.0 Python/3.8.10",
    "userIdentity": {
        "arn": "arn:aws:iam::123456789012:user/john.doe",
        "userName": "john.doe",
        "type": "IAMUser"
    },
}

// Output log event
{
    // Normalized fields are added under a new `@ecs` object
    "@ecs": {
        "event": { "action": "CreateBucket", "outcome": "success" },
        "cloud": {
            "provider": "aws",
            "region": "us-east-1",
            "account": { "id": "123456789012" },
            "service": { "name": "s3.amazonaws.com" },
        },
        "http": { "request_id": "request-1234567890" },
        "source": { "ip": "192.168.1.1" },
        "user_agent": "aws-cli/2.2.0 Python/3.8.10",
        "user": { "id": "arn:aws:iam::123456789012:user/john.doe", "name": "john.doe" },
    },
    // Existing fields are still included and unchanged
    "eventName": "CreateBucket",
    "awsRegion": "us-east-1",
    "recipientAccountId": "123456789012",
    "eventSource": "s3.amazonaws.com",
    "requestID": "request-1234567890",
    "sourceIPAddress": "192.168.1.1",
    "userAgent": "aws-cli/2.2.0 Python/3.8.10",
    "userIdentity": {
        "arn": "arn:aws:iam::123456789012:user/john.doe",
        "userName": "john.doe",
        "type": "IAMUser"
    },
}

Extract by Regex

Extract values from a specified column using a regex and add them as new columns.

Parameters

  • Extract from column: The existing column to extract from.

  • Regex:

    • The regex pattern to apply to the column value. Must have at least one named capture groups.

    • The name of the captured group will be used in the new column names.

Example

// Parameters
// Column: "message"
// Regex: "status: (?P<status_code>[\d\.]+), request_id: (?P<request_id>[\d\.]+)"

// Input log event
{
    "message": "Request succeeded. status: 200, request_id: 123",
}

// Output log event
{
    "message": "Request succeeded. status: 200, request_id: 123",
    // Parsed columns are added under a new `.%regex` object
    "message.%regex": {
        // Extracted values are always strings
        "status_code": "200",
        "request_id": "123",
    },
}

Extract Timestamp

Every log event in Scanner must have a timestamp. This transformation allows users to specify the column from which to extract the timestamp. Must have at least one per import rule.

Supported Timestamp Formats

Scanner supports various timestamp formats, including:

  • RFC 8601

  • RFC 3339

  • Unix epoch timestamp (seconds/milliseconds/microseconds/nanoseconds since epoch)

The best way to check if timestamps are extracted correctly is to use the import preview tool. A warning will appear if Scanner failed to extract the timestamps from the specified columns.

Fallbacks

You may specify additional "Extract Timestamp" steps as fallbacks. This is useful if the logs from the same source are heterogenous (i.e. they don't all have the same timestamp field).

If all fails (e.g. none of the columns specified are present), Scanner will make a best guess based on:

  • The timestamp of preceding log events in the same file, or

  • The S3 file's "last modified" timestamp.

Parameters

  • Extract from column: The timestamp column, or the column from which it will be extracted.

  • Regex (optional):

    • If the timestamp needs to be extracted from a string column (e.g. a "message" column), the regex is used to extract the value.

    • Must have exactly one capture group for the timestamp value.

    • Not needed if the column value contains just the timestamp.

    • Does not apply if the column value is not a string.

Example

// Parameters (with multiple steps defined)
// 1. Column: "started_at", Regex: (none)
// 2. Column: "event.timestamp", Regex: (none)
// 3. Column: "message", Regex: "^(\S+)\s"

// Input log events (from the same file)
{
    "time": "2023-04-05T12:34:56.789Z",
    "started_at": "2023-04-05T12:34:56.123Z",
    "request_id": "123",
    "message": "2023-04-05T12:34:56.234Z INFO Handling request",
}
{
    "request_id": "123",
    "message": "2023-04-05T12:34:56.345Z ERROR Request failed",
}
{
    "request_id": "123",
    "event": {
        "type": "Some event type",
        "timestamp": 1680698096567, // milliseconds since epoch
    },
}

// Extracted timestamps
"2023-04-05T12:34:56.123"
"2023-04-05T12:34:56.345"
"2023-04-05T12:34:56.567" // 1680698096567 milliseconds since epoch

Parse JSON Columns

Parses all stringified JSON objects or arrays, so the structure is reflected and indexed in Scanner.

Example

// Input log event
{
    "logStream": "abcd1234",
    "message": "{\"elapsed_ms\":238,\"status\":\"200\",\"request_id\":\"123\"}",
}

// Output log event
{
    "logStream": "abcd1234",
    // The original column is preserved
    "message": "{\"elapsed_ms\":238,\"status\":\"200\",\"request_id\":\"123\"}",
    // Parsed JSON object is added under a new `.%json` object
    "message.%json": {
        "elapsed_ms": 238,
        "status": "200",
        "request_id": "123",
    },
}

Parse Key-Value Columns

Parses all "key=value" pairs from all string columns.

Note that there isn't a single widely adopted standard for key-value pairs. If the Scanner implementation does not parse your logs correctly (or is too noisy), you should use the "Extract by Regex" transformation instead.

Example

// Input log event
{
    "logStream": "abcd1234",
    "message": "Finished running worker. elapsed_ms=238, status=200, request_id=123",
}

// Output log event
{
    "logStream": "abcd1234",
    "message": "Finished running worker. elapsed_ms=238, status=200, request_id=123",
    // Parsed fields are added under a new `.%kv` object. All values will be strings.
    "message.%kv": {
        "elapsed_ms": "238",
        "status": "200",
        "request_id": "123",
    },
}

Unroll Array

Transform one log event into multiple by unrolling an array column. Useful when the actual events are wrapped in an array in one object.

All fields other than the "unroll column" will be duplicated for each log event.

Parameters

  • Unroll column: The column to be unrolled.

Example

// Parameters
// Column: "events"

// Input log event
{
    "timestamp": "2023-04-05T12:34:56Z",
    "user": "john@example.com",
    "user_ip": "192.168.1.1",
    "events": [
        { "action": "foo", "outcome": "success" },
        { "action": "bar", "outcome": "failure", "error": "AccessDenied" },
        { "action": "baz", "outcome": "success" },
    ],
}

// Output log events (multiple)
{
    // Other columns are copied for each unrolled log event
    "timestamp": "2023-04-05T12:34:56Z",
    "user": "john@example.com",
    "user_ip": "192.168.1.1",
    // The original `events` array becomes a single object
    "events": {
        // The index from the original array is added to every log event
        "%idx": 0,
        "action": "foo",
        "outcome": "success",
    },
}
{
    "timestamp": "2023-04-05T12:34:56Z",
    "user": "john@example.com",
    "user_ip": "192.168.1.1",
    "events": {
        "%idx": 1,
        "action": "bar",
        "outcome": "failure",
        "error": "AccessDenied",
    },
}
{
    "timestamp": "2023-04-05T12:34:56Z",
    "user": "john@example.com",
    "user_ip": "192.168.1.1",
    "events": {
        "%idx": 2,
        "action": "baz",
        "outcome": "success",
    },
}
PreviousConfiguration - BasicNextPreviewing Imports

Last updated 23 days ago

Was this helpful?