Speeding up S3 object processing — Part 1

AWS S3 is the go to cloud based object storage widely used in the industry today. But as you become a power user of s3, some of the standard ways of copying , deleting , listing objects show their limitations.

For example: How does one copy objects from one s3 prefix to another? Here is the sample code using python and boto3:

import boto3

s3_client = boto3.client('s3')
kwargs = {'Bucket': 'src_bucket', 'Prefix': 'src_prefix'}
continuation_token = 'DUMMY_VAL'
while continuation_token:
if continuation_token != 'DUMMY_VAL':
kwargs['ContinuationToken'] = continuation_token
objects_response = s3_client.list_objects_v2(**kwargs)
# Check if objects response really has some objects in it
for object in objects_response['Contents']:
object_key = object['Key']
copy_source = {
'Bucket': 'src_bucket',
'Key': object_key
}
dest_bucket = s3_client.Bucket('dest_bucket')
dest_bucket.copy(copy_source, object_key)
try:
continuation_token = objects_response['NextContinuationToken']
except KeyError:
continuation_token = None

This would be the standard nice way to do it. As an aside, note that I have used list_objects_v2 api and not the default list_objects API which only returns a max of 1000 objects.

So what could go wrong with this code?

In terms of being functional there is nothing wrong with the code. But how scalable is this bit of code, say for copying a 100 objects? how about copying a 1000? or a million objects? Here is the interesting aspect: This code will take time linear in the number of objects you want to copy and that is not really scalable, especially in contexts like a lambda where your runtime is limited to a max of 15 minutes . If you are doing more things in the lambda in addition to copying, the remainder of the code may not get to run due to timing out after 15 minutes.

Which brings to the idea of doing things in parallel or concurrently. Here I will present a solution which uses python’s multithreading facilities to copy objects in parallel.

First some plumbing code to create about a 10000 s3 objects so we have some test data:

def get_random_string(length):
letters = string.ascii_lowercase
result_str = ''.join(random.choice(letters) for i in range(length))
# print("Random string of length", length, "is:", result_str)
return result_str
def populate_data(bucket, prefix, num_files, str_length):
for n in range(num_files):
s3_key = f"{prefix}/{n:07}.txt"
print(s3_key)
random_string = get_random_string(str_length)
s3.put_object(Body=random_string, Bucket=bucket, Key=s3_key)

I basically plonked the above code in a lambda and soon the had the requisite number of dummy objects in s3

Next, lets do an s3 copy in a serial fashion:

def copy_object(src_bucket, src_prefix, dest_bucket, dest_prefix, s3_key):
s3_filename = s3_key[(s3_key.index('/')+1):]
dest_key = f"{dest_prefix}/{s3_filename}"
s3.copy_object(Bucket=dest_bucket, Key=dest_key, CopySource={'Bucket': src_bucket, 'Key': s3_key})

def copy_serial(src_bucket, src_prefix, dest_bucket, dest_prefix):
for s3_objects in s3_lister(src_bucket, src_prefix):
for s3_key in s3_objects:
copy_object(src_bucket,src_prefix,dest_bucket,dest_prefix,s3_key)
# print(dest_key)

I timed the copy_serial method and for copying 2000 s3 objects, it was averaging a time of 170 secs and had a memory usage of 84 MB

Next let us use python’s multithreading capabilities to parallelize the s3 copy operations. Here is the code snippet:

def copy_parallel(src_bucket, src_prefix, dest_bucket, dest_prefix):
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for s3_objects in s3_lister(src_bucket, src_prefix):
for s3_key in s3_objects:
futures.append(executor.submit(copy_object, src_bucket=src_bucket,src_prefix=src_prefix,dest_bucket=dest_bucket,dest_prefix=dest_prefix,s3_key=s3_key))
for future in concurrent.futures.as_completed(futures):
future.result()

Parallelizing certainly helped. So I tried with 10 threads chugging away and it took about 123 secs to complete the copy which is a significant improvement over the serial execution time of 170 secs. However we pay the cost with more memory usage , 98 MB vs 84 MB usage for the serial approach. Since this lambda was allocated 128 MB, that is still comfortably under the memory limit.

What happens if we increase the number of threads to 20? Interestingly enough there is a regression in performance. This actually takes more time (129 secs) and more memory (105 MB). I did see an error with regards to exceeding the boto3 connection pool size and I wondered if this was a limiting factor. Connection pool is full, discarding connection:… Turns out the default pool size is 10. So I incremented the pool size to 20 to match it to the number of threads in the ThreadPoolExecutor . Doing this helped with the warnings , but actually worsened the runtime even further to 132 secs! So simply dialing up the parallelism is not a panacea to all our slow performing code. Also if the underlying cpus are fewer than the threads then we can see such performance degradation. So as a final attempt, I decided to set it to os.cpu_cput()+4 which is the default parallelism set by python for the ThreadPoolExecutor from v3.8+ onwards.

Viola! That gave the best runtime of 113 secs! and the memory usage also dropped to 94 MB. So lesson learned is that parallelism is limited by how many cores the os is provisioned with.

In conclusion, parallelism helps but the level of parallelism needs to be tuned for extracting the best performance from a given system

Hope you enjoyed this article! In the next part of this series, I will discuss how we can use concurrency to improve s3 copy operation performance.

Head over to part 2 of this story

References

  1. https://aws.amazon.com/premiumsupport/knowledge-center/s3-large-transfer-between-buckets/

A lazy coder who works hard to build good software that does not page you in the middle of the night!