If you take a closer look at the API documentation, you will find the last login date and time LastKnownUserConnectionTimestamp
in DescribeWorkspacesConnectionStatus!
DescribeWorkspacesConnectionStatus
//Response Syntax
{
"NextToken": "string",
"WorkspacesConnectionStatus": [
{
"ConnectionState": "string",
"ConnectionStateCheckTimestamp": number,
"LastKnownUserConnectionTimestamp": number, //<-this!
"WorkspaceId": "string"
}
]
}
However, it seems that you can only request up to 25 instances after putting WorkspaceId in this API and request. It seems that if there are 25 or more units, it is not possible to acquire all of them at once.
WorkspaceIds The identifiers of the WorkSpaces. You can specify up to 25 WorkSpaces.
getInstances.py
import boto3
import csv
# Initialize SDK client for AWS Workspaces
client = boto3.client('workspaces')
paginator_describe = client.get_paginator('describe_workspaces')
paginator_connection = client.get_paginator('describe_workspaces_connection_status')
# Initialize output CSV header
data = [["UserName", "WorkspaceId", "Directory", "State", "ComputerType", "LastLogin"]]
# Initialize Directory ID list
DirectoryIds = [
'd-XXXXXXXX',
'd-YYYYYYYY'
]
# Initialize mapping between Directory ID and Name
def addDirectoryMapping(direcotry):
directoryMapping[direcotry["DirectoryId"]] = direcotry["Alias"]
directoryMapping = {}
paginator = client.get_paginator('describe_workspace_directories')
page_iterator = paginator.paginate()
for page in page_iterator:
map(addDirectoryMapping, page["Directories"])
print("Direcotry mapping is DONE")
# Find Directory Name from ID
# @param workspace ID
# @return Name of directory
def findDirectoryName(direcotyrId):
if direcotyrId in directoryMapping:
return directoryMapping[direcotyrId]
else:
return "Other"
# Find last login time by SingleId
# @param workspace ID
# @return LastLogin Time
def findLatestLoginTimeById(workspaceId):
response = client.describe_workspaces_connection_status(
WorkspaceIds=[
workspaceId
]
)
if not len(response["WorkspacesConnectionStatus"])==0:
connectionStatus = response["WorkspacesConnectionStatus"][0]
if "LastKnownUserConnectionTimestamp" in connectionStatus:
return connectionStatus["LastKnownUserConnectionTimestamp"]
else:
return "unknown"
else:
return "Error"
# Get necessary information from Workspace Instance
# @param workspaces
# @return list
def parse_list(workspaces):
list=[]
list.append(workspaces["UserName"])
list.append(workspaces["WorkspaceId"])
list.append(findDirectoryName(workspaces["DirectoryId"]))
list.append(workspaces["State"])
list.append(workspaces["WorkspaceProperties"]["ComputeTypeName"])
list.append(findLatestLoginTimeById(workspaces["WorkspaceId"]))
return list
# Get necessary information from Workspace Connection Status
# @param WorkspacesConnectionStatus
# @return list
def parse_list_connection(WorkspacesConnectionStatus):
list=[]
list.append(WorkspacesConnectionStatus["WorkspaceId"])
if "LastKnownUserConnectionTimestamp" in WorkspacesConnectionStatus:
list.append(WorkspacesConnectionStatus["LastKnownUserConnectionTimestamp"])
else:
list.append("unknown")
return list
# Export CSV file
def exportCSV(data):
with open("VDI_LIST.csv", "w") as f:
writer = csv.writer(f, lineterminator="\n")
writer.writerows(data)
print("Create File is DONE")
# main function
def describeWorkspacesByDirectoryId (Id):
try:
workspacesDescribeInDirectory = []
print(Id + " is started")
# describe workspaces
page_iterator_describe = paginator_describe.paginate(DirectoryId=Id,PaginationConfig={'PageSize': 10})
for page in page_iterator_describe:
workspacesDescribeInDirectory = map(parse_list, page["Workspaces"])
data.extend(workspacesDescribeInDirectory)
print(Id + " describe_workspaces is DONE")
except Exception as e:
print(e.message + " :: impacted Directory ID is " + Id)
map(describeWorkspacesByDirectoryId, DirectoryIds)
exportCSV(data)
Recommended Posts