# Welding classification

This model recognizes the welding action by identifying the light and sparks emitted by the process. The model operates under different lighting conditions. It can be utilized via a standard set of video surveillance cameras with low video resolution. It can be applied to measure the efficiency of welding worker operation.

If you have any questions feel free to contact us info@agmis.eu or fill our [contact form](https://agmis.lt/contacts/).

In [None]:
import base64
import io
import os
import re
import time
import uuid

import boto3
import cv2
import numpy as np
import sagemaker as sage

from datetime import datetime

from matplotlib.pyplot import imshow
from PIL import Image as PILImage
from IPython.display import Image, display
from sagemaker import get_execution_role

%matplotlib inline

In [None]:
def get_current_time():
    return datetime.now().strftime('%Y-%m-%d-%H-%M-%S')

In [None]:
sess = sage.Session()
role = get_execution_role()

In [None]:
creation_time = get_current_time()

### Create model from model package

In [None]:
sagemaker = boto3.client(service_name='sagemaker')

In [None]:
model_package_arn = '<enter_model_package_arn>'
model_name = 'welding-classifier-' + creation_time

In [None]:
create_model_dict = {
    'ModelName': model_name,
    'PrimaryContainer': {
        'ModelPackageName': model_package_arn
    },
    'ExecutionRoleArn': role,
    'EnableNetworkIsolation': True
} 

model = sagemaker.create_model(**create_model_dict)
sagemaker.describe_model(ModelName = model_name)

## S3 bucket

Here is a public S3 bucket where you can find images of welding.

Input images dir: _welding-input-imgs_

Output files dir: _welding-output_

In [None]:
bucket = 'sagemaker-storage-data'
input_dir_name = 'welding-input-imgs'
s3_input_data_path ='s3://{}/{}/'.format(bucket, input_dir_name)

output_bucket_name = '<enter_your_s3_bucket_name>'
output_dir_name = '<enter_output_directory_name>'

# Results will be saved in your bucket, 
# example output files can be find in sagemaker-storage-data s3 bucket welding-output directory
s3_output_data_path ='s3://{}/{}/'.format(output_bucket_name, output_dir_name)

In [None]:
s3client = boto3.client('s3', region_name='us-east-2')
results = s3client.list_objects_v2(Bucket=bucket, Prefix=input_dir_name)
images_list = [obj['Key'] for obj in results['Contents'] if obj['Key'] != input_dir_name + '/']
images_list

## Batch transform jobs

In [None]:
batch_job_name = 'welding-' + get_current_time()

Here is batch transform job configuration. It is not recommended to choose `InstanceType` with less compute power than `ml.c4.8xlarge`

In [None]:
batch_job_config = \
{
  "TransformJobName": batch_job_name,
  "BatchStrategy": "SingleRecord",
  "MaxConcurrentTransforms": 4,
  "MaxPayloadInMB": 10,
  "ModelName": model_name,
  "TransformInput": {
    "DataSource": {
      "S3DataSource": {
        "S3DataType": "S3Prefix",
        "S3Uri": s3_input_data_path
      }
    },
    "ContentType": "application/x-image",
    "CompressionType": "None",
    "SplitType": "None"
  },
  "TransformOutput": {
    "S3OutputPath": s3_output_data_path,
    "Accept": "text/*",
    "AssembleWith": "Line"
  },
  "TransformResources": {
    "InstanceType": "ml.c4.8xlarge",
    "InstanceCount": 1
  }
}

sagemaker.create_transform_job(**batch_job_config)
print('Created batch transform job ', batch_job_name)

Check your created transform job status every 30 seconds until completed

In [None]:
while(True):
    response = sagemaker.describe_transform_job(TransformJobName=batch_job_name)
    status = response['TransformJobStatus']
    if  status == 'Completed':
        print("Transform job ended with status: " + status)
        break
    if status == 'Failed':
        message = response['FailureReason']
        print('Transform failed with the following error: {}'.format(message))
        raise Exception('Transform job failed') 
    print('Transform job is still in status: ' + status)    
    time.sleep(30)

## Endpoint

### Create endpoint config

In [None]:
endpoint_config_name = 'welding-endpoint-config-' + creation_time
create_endpoint_config_response = sagemaker.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType': 'ml.m5.large',
        'InitialVariantWeight': 1,
        'InitialInstanceCount': 1,
        'ModelName': model_name,
        'VariantName': 'AllTraffic'}])

print('Arn of endpoint config ' + create_endpoint_config_response['EndpointConfigArn'])

### Create endpoint

In [None]:
endpoint_name = 'welding-endpoint-' + creation_time
print(endpoint_name)
create_endpoint_response = sagemaker.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])

resp = sagemaker.describe_endpoint(EndpointName=endpoint_name)

Check your created endpoint status every 30 seconds until completed

In [None]:
status = resp['EndpointStatus']
print('Status: ' + status)

while status=='Creating':
    time.sleep(30)
    resp = sagemaker.describe_endpoint(EndpointName=endpoint_name)
    status = resp['EndpointStatus']
    print('Status: ' + status)

print('Arn: ' + resp['EndpointArn'])
print('Status: ' + status)

In [None]:
runtime = boto3.Session().client(service_name='runtime.sagemaker')

In [None]:
def read_image_from_s3(bucket, key, region_name='us-east-2'):
    s3 = boto3.resource('s3', region_name=region_name)
    bucket = s3.Bucket(bucket)
    image = bucket.Object(key)
    response = image.get()
    file_stream = response['Body']
    im = PILImage.open(file_stream)
    imgByteArr = io.BytesIO()
    im.save(imgByteArr, format=im.format)
    imgByteArr = imgByteArr.getvalue()

    return imgByteArr, im

In [None]:
def invoke_endpoint(endpoint_name, bytes_image):
    response = runtime.invoke_endpoint(EndpointName=endpoint_name,
                                       ContentType='image/jpeg',
                                       Body=bytes_image)
    response = response['Body'].read()
    probability = float(response)
    
    return probability

In [None]:
def predict_welding(bucket, img_path):
    bytes_image, pil_im = read_image_from_s3(bucket, img_path)
    probability = invoke_endpoint(endpoint_name, bytes_image)
    
    return probability, pil_im

### Process images

In [None]:
for img in images_list:
    prob, pil_im = predict_welding(bucket, img)
    display(pil_im)
    print('Probability of welding: ', prob)
    print()

### Process video

In [None]:
video_path = 'https://sagemaker-storage-data.s3.us-east-2.amazonaws.com/welder.mpeg'
video = cv2.VideoCapture(video_path)
fps = video.get(cv2.CAP_PROP_FPS)
frame_size = (int(video.get(cv2.CAP_PROP_FRAME_WIDTH)), int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)))
output_video = cv2.VideoWriter('welding.avi', cv2.VideoWriter_fourcc(*'XVID'), fps, frame_size)

In [None]:
endpoint_name = 'welding'
while video.isOpened():
    return_value, frame = video.read()
    if not return_value:
        break
    img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    imgByteArr = io.BytesIO()
    im = PILImage.fromarray(img)
    im.save(imgByteArr, format='JPEG')
    imgByteArr = imgByteArr.getvalue()
    probability = invoke_endpoint(endpoint_name, imgByteArr)
    cv2.putText(frame, f'welding: {probability * 100:6.2f} %', (10, 50), fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                    fontScale=2, color=(0, 255, 0), thickness=2)
    output_video.write(frame)

video.release()
output_video.release()